Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Introduce a native transport for linux using epoll ET

This transport use JNI (C) to directly make use of epoll in Edge-Triggered mode for maximal performance on Linux. Beside this it also support using TCP_CORK and produce less GC then the NIO transport using JDK NIO.
It only builds on linux and skip the build if linux is not used. The transport produce a jar which contains all needed .so files for 32bit and 64 bit. The user only need to include the jar as dependency as usually
to make use of it and use the correct classes.

This includes also some cleanup of @trustin
  • Loading branch information...
commit e0299e12228ef5d6791e0e8e722a5788ff7f617e 1 parent c73e1e3
@normanmaurer normanmaurer authored
Showing with 3,643 additions and 1 deletion.
  1. +149 −0 common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java
  2. +143 −0 common/src/main/java/io/netty/util/internal/PlatformDependent.java
  3. +16 −1 pom.xml
  4. +3 −0  transport-native-epoll/README.md
  5. +113 −0 transport-native-epoll/pom.xml
  6. +859 −0 transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c
  7. +60 −0 transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h
  8. +163 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java
  9. +27 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java
  10. +356 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java
  11. +75 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java
  12. +133 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java
  13. +176 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java
  14. +590 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java
  15. +274 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java
  16. +136 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java
  17. +21 −0 transport-native-epoll/src/main/java/io/netty/channel/epoll/package-info.java
  18. +47 −0 transport-native-epoll/src/main/native-package/m4/custom.m4
  19. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketEchoTest.java
  20. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFileRegionTest.java
  21. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketFixedLengthEchoTest.java
  22. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketGatheringWriteTest.java
  23. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketObjectEchoTest.java
  24. +39 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslEchoTest.java
  25. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStartTlsTest.java
  26. +31 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketStringEchoTest.java
  27. +46 −0 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTestUtils.java
View
149 common/src/main/java/io/netty/util/internal/NativeLibraryLoader.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.util.internal;
+
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.Locale;
+import java.util.regex.Pattern;
+
+/**
+ * Helper class to load JNI resources.
+ *
+ */
+public final class NativeLibraryLoader {
+
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(NativeLibraryLoader.class);
+
+ private static final Pattern REPLACE = Pattern.compile("\\W+");
+ private static final File WORKDIR;
+
+ static {
+ String workdir = SystemPropertyUtil.get("io.netty.native.workdir");
+ if (workdir != null) {
+ File f = new File(workdir);
+ if (!f.exists()) {
+ // ok to ignore as createTempFile will take care
+ //noinspection ResultOfMethodCallIgnored
+ f.mkdirs();
+ }
+
+ try {
+ f = f.getAbsoluteFile();
+ } catch (Exception ignored) {
+ // Good to have an absolute path, but it's OK.
+ }
+
+ WORKDIR = f;
+ logger.debug("-Dio.netty.netty.workdir: {}", WORKDIR);
+ } else {
+ WORKDIR = PlatformDependent.tmpdir();
+ logger.debug("-Dio.netty.netty.workdir: {} (io.netty.tmpdir)", WORKDIR);
+ }
+ }
+
+ /**
+ * Load the given library with the specified {@link java.lang.ClassLoader}
+ */
+ public static void load(String name, ClassLoader loader) {
+ String libname = System.mapLibraryName(name);
+ String path = "META-INF/native/" + osIdentifier() + PlatformDependent.bitMode() + '/' + libname;
+
+ URL url = loader.getResource(path);
+ if (url == null) {
+ // Fall back to normal loading of JNI stuff
+ System.loadLibrary(name);
+ } else {
+ int index = libname.lastIndexOf('.');
+ String prefix = libname.substring(0, index);
+ String suffix = libname.substring(index, libname.length());
+ InputStream in = null;
+ OutputStream out = null;
+ File tmpFile = null;
+ boolean loaded = false;
+ try {
+ tmpFile = File.createTempFile(prefix, suffix, WORKDIR);
+ in = url.openStream();
+ out = new FileOutputStream(tmpFile);
+
+ byte[] buffer = new byte[8192];
+ int length;
+ while ((length = in.read(buffer)) > 0) {
+ out.write(buffer, 0, length);
+ }
+ out.flush();
+ out.close();
+ out = null;
+
+ System.load(tmpFile.getPath());
+ loaded = true;
+ } catch (Exception e) {
+ throw (UnsatisfiedLinkError) new UnsatisfiedLinkError(
+ "could not load a native library: " + name).initCause(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ignore) {
+ // ignore
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException ignore) {
+ // ignore
+ }
+ }
+ if (tmpFile != null) {
+ if (loaded) {
+ tmpFile.deleteOnExit();
+ } else {
+ if (!tmpFile.delete()) {
+ tmpFile.deleteOnExit();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static String osIdentifier() {
+ String name = SystemPropertyUtil.get("os.name", "unknown").toLowerCase(Locale.US).trim();
+ if (name.startsWith("win")) {
+ return "windows";
+ }
+ if (name.startsWith("mac os x")) {
+ return "osx";
+ }
+ if (name.startsWith("linux")) {
+ return "linux";
+ }
+
+ return REPLACE.matcher(name).replaceAll("_");
+ }
+
+ private NativeLibraryLoader() {
+ // Utility
+ }
+}
View
143 common/src/main/java/io/netty/util/internal/PlatformDependent.java
@@ -21,6 +21,7 @@
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.BufferedReader;
+import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
@@ -74,6 +75,10 @@
private static final boolean HAS_JAVASSIST = hasJavassist0();
+ private static final File TMPDIR = tmpdir0();
+
+ private static final int BIT_MODE = bitMode0();
+
static {
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
@@ -154,6 +159,20 @@ public static boolean hasJavassist() {
}
/**
+ * Returns the temporary directory.
+ */
+ public static File tmpdir() {
+ return TMPDIR;
+ }
+
+ /**
+ * Returns the bit mode of the current VM (usually 32 or 64.)
+ */
+ public static int bitMode() {
+ return BIT_MODE;
+ }
+
+ /**
* Raises an exception bypassing compiler checks for checked exceptions.
*/
public static void throwException(Throwable t) {
@@ -638,6 +657,130 @@ private static boolean hasJavassist0() {
}
}
+ private static File tmpdir0() {
+ File f;
+ try {
+ f = toDirectory(SystemPropertyUtil.get("io.netty.tmpdir"));
+ if (f != null) {
+ logger.debug("-Dio.netty.tmpdir: {}", f);
+ return f;
+ }
+
+ f = toDirectory(SystemPropertyUtil.get("java.io.tmpdir"));
+ if (f != null) {
+ logger.debug("-Dio.netty.tmpdir: {} (java.io.tmpdir)", f);
+ return f;
+ }
+
+ // This shouldn't happen, but just in case ..
+ if (isWindows()) {
+ f = toDirectory(System.getenv("TEMP"));
+ if (f != null) {
+ logger.debug("-Dio.netty.tmpdir: {} (%TEMP%)", f);
+ return f;
+ }
+
+ String userprofile = System.getenv("USERPROFILE");
+ if (userprofile != null) {
+ f = toDirectory(userprofile + "\\AppData\\Local\\Temp");
+ if (f != null) {
+ logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\AppData\\Local\\Temp)", f);
+ return f;
+ }
+
+ f = toDirectory(userprofile + "\\Local Settings\\Temp");
+ if (f != null) {
+ logger.debug("-Dio.netty.tmpdir: {} (%USERPROFILE%\\Local Settings\\Temp)", f);
+ return f;
+ }
+ }
+ } else {
+ f = toDirectory(System.getenv("TMPDIR"));
+ if (f != null) {
+ logger.debug("-Dio.netty.tmpdir: {} ($TMPDIR)", f);
+ return f;
+ }
+ }
+ } catch (Exception ignored) {
+ // Environment variable inaccessible
+ }
+
+ // Last resort.
+ if (isWindows()) {
+ f = new File("C:\\Windows\\Temp");
+ } else {
+ f = new File("/tmp");
+ }
+
+ logger.warn("Failed to get the temporary directory; falling back to: {}", f);
+ return f;
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private static File toDirectory(String path) {
+ if (path == null) {
+ return null;
+ }
+
+ File f = new File(path);
+ if (!f.exists()) {
+ f.mkdirs();
+ }
+
+ if (!f.isDirectory()) {
+ return null;
+ }
+
+ try {
+ return f.getAbsoluteFile();
+ } catch (Exception ignored) {
+ return f;
+ }
+ }
+
+ private static int bitMode0() {
+ // Check user-specified bit mode first.
+ int bitMode = SystemPropertyUtil.getInt("io.netty.bitMode", 0);
+ if (bitMode > 0) {
+ logger.debug("-Dio.netty.bitMode: {}", bitMode);
+ return bitMode;
+ }
+
+ // And then the vendor specific ones which is probably most reliable.
+ bitMode = SystemPropertyUtil.getInt("sun.arch.data.model", 0);
+ if (bitMode > 0) {
+ logger.debug("-Dio.netty.bitMode: {} (sun.arch.data.model)", bitMode);
+ return bitMode;
+ }
+ bitMode = SystemPropertyUtil.getInt("com.ibm.vm.bitmode", 0);
+ if (bitMode > 0) {
+ logger.debug("-Dio.netty.bitMode: {} (com.ibm.vm.bitmode)", bitMode);
+ return bitMode;
+ }
+
+ // os.arch also gives us a good hint.
+ String arch = SystemPropertyUtil.get("os.arch", "").toLowerCase(Locale.US).trim();
+ if ("amd64".equals(arch) || "x86_64".equals(arch)) {
+ bitMode = 64;
+ } else if ("i386".equals(arch) || "i486".equals(arch) || "i586".equals(arch) || "i686".equals(arch)) {
+ bitMode = 32;
+ }
+
+ if (bitMode > 0) {
+ logger.debug("-Dio.netty.bitMode: {} (os.arch: {})", bitMode, arch);
+ }
+
+ // Last resort: guess from VM name and then fall back to most common 64-bit mode.
+ String vm = SystemPropertyUtil.get("java.vm.name", "").toLowerCase(Locale.US);
+ Pattern BIT_PATTERN = Pattern.compile("([1-9][0-9]+)-?bit");
+ Matcher m = BIT_PATTERN.matcher(vm);
+ if (m.find()) {
+ return Integer.parseInt(m.group(1));
+ } else {
+ return 64;
+ }
+ }
+
private PlatformDependent() {
// only static method supported
}
View
17 pom.xml
@@ -120,6 +120,17 @@
<maven.javadoc.failOnError>false</maven.javadoc.failOnError>
</properties>
</profile>
+ <profile>
+ <id>linux-native</id>
+ <activation>
+ <os>
+ <family>linux</family>
+ </os>
+ </activation>
+ <modules>
+ <module>transport-native-epoll</module>
+ </modules>
+ </profile>
</profiles>
<properties>
@@ -499,6 +510,10 @@
<goal>manifest</goal>
</goals>
<configuration>
+ <supportedProjectTypes>
+ <supportedProjectType>jar</supportedProjectType>
+ <supportedProjectType>bundle</supportedProjectType>
+ </supportedProjectTypes>
<instructions>
<Export-Package>${project.groupId}.*</Export-Package>
<!-- enforce JVM vendor package as optional -->
@@ -565,7 +580,7 @@
<version>2.4.2</version>
<configuration>
<useReleaseProfile>false</useReleaseProfile>
- <arguments>-P release,sonatype-oss-release,full,no-osgi</arguments>
+ <arguments>-P release,sonatype-oss-release,full,no-osgi,linux-native</arguments>
<autoVersionSubmodules>true</autoVersionSubmodules>
<allowTimestampedSnapshots>false</allowTimestampedSnapshots>
<tagNameFormat>netty-@{project.version}</tagNameFormat>
View
3  transport-native-epoll/README.md
@@ -0,0 +1,3 @@
+# Native transport for Linux
+
+See [our wiki page](http://netty.io/wiki/native-transports.html).
View
113 transport-native-epoll/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="ISO-8859-15"?>
+<!--
+ ~ Copyright 2014 The Netty Project
+ ~
+ ~ The Netty Project licenses this file to you under the Apache License,
+ ~ version 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at:
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-parent</artifactId>
+ <version>5.0.0.Alpha2-SNAPSHOT</version>
+ </parent>
+ <artifactId>netty-transport-native-epoll</artifactId>
+
+ <name>Netty/Transport/Native/Epoll</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-testsuite</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.fusesource.hawtjni</groupId>
+ <artifactId>maven-hawtjni-plugin</artifactId>
+ <version>1.10</version>
+ <executions>
+ <execution>
+ <id>build-linux64</id>
+ <configuration>
+ <name>${project.artifactId}</name>
+ <buildDirectory>${project.build.directory}/linux64</buildDirectory>
+ <nativeSourceDirectory>${nativeSourceDirectory}</nativeSourceDirectory>
+ <libDirectory>${libDirectory}</libDirectory>
+ <configureArgs>
+ <arg>--with-arch=x86_64</arg>
+ </configureArgs>
+ <platform>linux64</platform>
+ <forceConfigure>true</forceConfigure>
+ <forceAutogen>true</forceAutogen>
+ </configuration>
+ <goals>
+ <goal>generate</goal>
+ <goal>build</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>build-linux32</id>
+ <configuration>
+ <buildDirectory>${project.build.directory}/linux32</buildDirectory>
+ <nativeSourceDirectory>${nativeSourceDirectory}</nativeSourceDirectory>
+ <libDirectory>${libDirectory}</libDirectory>
+ <name>${project.artifactId}</name>
+ <configureArgs>
+ <arg>--with-arch=i386</arg>
+ </configureArgs>
+ <platform>linux32</platform>
+ <forceConfigure>true</forceConfigure>
+ <forceAutogen>true</forceAutogen>
+ </configuration>
+ <goals>
+ <goal>generate</goal>
+ <goal>build</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <properties>
+ <nativeSourceDirectory>${basedir}/src/main/c</nativeSourceDirectory>
+ <libDirectory>${basedir}/target/classes/</libDirectory>
+ </properties>
+</project>
View
859 transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c
@@ -0,0 +1,859 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+#include <jni.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <sys/sendfile.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include "io_netty_channel_epoll_Native.h"
+
+
+// optional
+extern int accept4(int sockFd, struct sockaddr *addr, socklen_t *addrlen, int flags) __attribute__((weak));
+
+// Those are initialized in the init(...) method and cached for performance reasons
+jmethodID posId = NULL;
+jmethodID limitId = NULL;
+jfieldID posFieldId = NULL;
+jfieldID limitFieldId = NULL;
+jfieldID fileChannelFieldId = NULL;
+jfieldID transferedFieldId = NULL;
+jfieldID fdFieldId = NULL;
+jfieldID fileDescriptorFieldId = NULL;
+jmethodID inetSocketAddrMethodId = NULL;
+jclass runtimeExceptionClass = NULL;
+jclass ioExceptionClass = NULL;
+jclass closedChannelExceptionClass = NULL;
+jmethodID closedChannelExceptionMethodId = NULL;
+jclass inetSocketAddressClass = NULL;
+static int socketType;
+
+// util methods
+void throwRuntimeException(JNIEnv *env, char *message) {
+ (*env)->ThrowNew(env, runtimeExceptionClass, message);
+}
+
+void throwIOException(JNIEnv *env, char *message) {
+ (*env)->ThrowNew(env, ioExceptionClass, message);
+}
+
+void throwClosedChannelException(JNIEnv *env) {
+ jobject exception = (*env)->NewObject(env, closedChannelExceptionClass, closedChannelExceptionMethodId);
+ (*env)->Throw(env, exception);
+}
+
+void throwOutOfMemoryError( JNIEnv *env, char *message) {
+ jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError");
+ (*env)->ThrowNew(env, exceptionClass, message);
+}
+
+char *exceptionMessage(char *msg, int error) {
+ char *err = strerror(error);
+ char *result = malloc(strlen(msg) + strlen(err) + 1);
+ strcpy(result, msg);
+ strcat(result, err);
+ return result;
+}
+
+jint epollCtl(JNIEnv * env, jint efd, int op, jint fd, jint flags, jint id) {
+ uint32_t events = EPOLLET;
+
+ if (flags & EPOLL_ACCEPT) {
+ events |= EPOLLIN;
+ }
+ if (flags & EPOLL_READ) {
+ events |= EPOLLIN | EPOLLRDHUP;
+ }
+ if (flags & EPOLL_WRITE) {
+ events |= EPOLLOUT;
+ }
+
+ struct epoll_event ev = {
+ .events = events,
+ // encode the id into the events
+ .data.u64 = (((uint64_t) id) << 32L)
+ };
+
+ return epoll_ctl(efd, op, fd, &ev);
+}
+
+jint getOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t optlen) {
+ int code;
+ code = getsockopt(fd, level, optname, &optval, &optlen);
+ if (code == 0) {
+ return 0;
+ }
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error during getsockopt(...): ", err));
+ return code;
+}
+
+int setOption(JNIEnv *env, jint fd, int level, int optname, const void *optval, socklen_t len) {
+ int rc = setsockopt(fd, level, optname, optval, len);
+ if (rc < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error during setsockopt(...): ", err));
+ }
+ return rc;
+}
+
+jobject createInetSocketAddress(JNIEnv * env, struct sockaddr_storage addr) {
+ char ipstr[INET6_ADDRSTRLEN];
+ int port;
+ if (addr.ss_family == AF_INET) {
+ struct sockaddr_in *s = (struct sockaddr_in *)&addr;
+ port = ntohs(s->sin_port);
+ inet_ntop(AF_INET, &s->sin_addr, ipstr, sizeof ipstr);
+ } else {
+ struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
+ port = ntohs(s->sin6_port);
+ inet_ntop(AF_INET6, &s->sin6_addr, ipstr, sizeof ipstr);
+ }
+ jobject socketAddr = (*env)->NewObject(env, inetSocketAddressClass, inetSocketAddrMethodId, ipstr, port);
+ return socketAddr;
+}
+
+void init_sockaddr(JNIEnv * env, jbyteArray address, jint scopeId, jint jport, struct sockaddr_storage * addr) {
+ uint16_t port = htons((uint16_t) jport);
+ jbyte* addressBytes = (*env)->GetByteArrayElements(env, address, 0);
+ if (socketType == AF_INET6) {
+ struct sockaddr_in6* ip6addr = (struct sockaddr_in6 *) addr;
+ ip6addr->sin6_family = AF_INET6;
+ ip6addr->sin6_port = port;
+
+ if (scopeId != 0) {
+ ip6addr->sin6_scope_id = (uint32_t) scopeId;
+ }
+ memcpy( &(ip6addr->sin6_addr.s6_addr), addressBytes, 16);
+ } else {
+ struct sockaddr_in* ipaddr = (struct sockaddr_in *) addr;
+ ipaddr->sin_family = AF_INET;
+ ipaddr->sin_port = port;
+ memcpy( &(ipaddr->sin_addr.s_addr), addressBytes + 12, 4);
+ }
+
+ (*env)->ReleaseByteArrayElements(env, address, addressBytes, JNI_ABORT);
+}
+
+static int socket_type() {
+ int fd = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
+ if (fd == -1) {
+ if (errno == EAFNOSUPPORT) {
+ return AF_INET;
+ }
+ return AF_INET6;
+ } else {
+ close(fd);
+ return AF_INET6;
+ }
+}
+// util methods end
+
+jint JNI_OnLoad(JavaVM* vm, void* reserved) {
+ JNIEnv* env;
+ if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) {
+ return JNI_ERR;
+ } else {
+ // cache classes that are used within other jni methods for performance reasons
+ jclass localClosedChannelExceptionClass = (*env)->FindClass(env, "java/nio/channels/ClosedChannelException");
+ if (localClosedChannelExceptionClass == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ closedChannelExceptionClass = (jclass) (*env)->NewGlobalRef(env, localClosedChannelExceptionClass);
+ if (closedChannelExceptionClass == NULL) {
+ // out-of-memory!
+ throwOutOfMemoryError(env, "Error allocating memory");
+ return JNI_ERR;
+ }
+ closedChannelExceptionMethodId = (*env)->GetMethodID(env, closedChannelExceptionClass, "<init>", "()V");
+ if (closedChannelExceptionMethodId == NULL) {
+ throwRuntimeException(env, "Unable to obtain constructor of ClosedChannelException");
+ return JNI_ERR;
+ }
+ jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException");
+ if (localRuntimeExceptionClass == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass);
+ if (runtimeExceptionClass == NULL) {
+ // out-of-memory!
+ throwOutOfMemoryError(env, "Error allocating memory");
+ return JNI_ERR;
+ }
+
+ jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException");
+ if (localIoExceptionClass == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass);
+ if (ioExceptionClass == NULL) {
+ // out-of-memory!
+ throwOutOfMemoryError(env, "Error allocating memory");
+ return JNI_ERR;
+ }
+
+ jclass localInetSocketAddressClass = (*env)->FindClass(env, "java/net/InetSocketAddress");
+ if (localIoExceptionClass == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ inetSocketAddressClass = (jclass) (*env)->NewGlobalRef(env, localInetSocketAddressClass);
+ if (inetSocketAddressClass == NULL) {
+ // out-of-memory!
+ throwOutOfMemoryError(env, "Error allocating memory");
+ return JNI_ERR;
+ }
+
+ void *mem = malloc(1);
+ if (mem == NULL) {
+ throwOutOfMemoryError(env, "Error allocating native buffer");
+ return JNI_ERR;
+ }
+ jobject directBuffer = (*env)->NewDirectByteBuffer(env, mem, 1);
+ if (directBuffer == NULL) {
+ throwOutOfMemoryError(env, "Error allocating native buffer");
+ return JNI_ERR;
+ }
+
+ jclass cls = (*env)->GetObjectClass(env, directBuffer);
+
+ // Get the method id for Buffer.position() and Buffer.limit(). These are used as fallback if
+ // it is not possible to obtain the position and limit using the fields directly.
+ posId = (*env)->GetMethodID(env, cls, "position", "()I");
+ if (posId == NULL) {
+ // position method was not found.. something is wrong so bail out
+ throwRuntimeException(env, "Unable to find method ByteBuffer.position()");
+ return JNI_ERR;
+ }
+
+ limitId = (*env)->GetMethodID(env, cls, "limit", "()I");
+ if (limitId == NULL) {
+ // limit method was not found.. something is wrong so bail out
+ throwRuntimeException(env, "Unable to find method ByteBuffer.limit()");
+ return JNI_ERR;
+ }
+
+ // Try to get the ids of the position and limit fields. We later then check if we was able
+ // to find them and if so use them get the position and limit of the buffer. This is
+ // much faster then call back into java via (*env)->CallIntMethod(...).
+ posFieldId = (*env)->GetFieldID(env, cls, "position", "I");
+ if (posFieldId == NULL) {
+ // this is ok as we can still use the method so just clear the exception
+ (*env)->ExceptionClear(env);
+ }
+ limitFieldId = (*env)->GetFieldID(env, cls, "limit", "I");
+ if (limitFieldId == NULL) {
+ // this is ok as we can still use the method so just clear the exception
+ (*env)->ExceptionClear(env);
+ }
+ jclass fileRegionCls = (*env)->FindClass(env, "io/netty/channel/DefaultFileRegion");
+ if (fileRegionCls == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ fileChannelFieldId = (*env)->GetFieldID(env, fileRegionCls, "file", "Ljava/nio/channels/FileChannel;");
+ if (fileChannelFieldId == NULL) {
+ throwRuntimeException(env, "Unable to obtain FileChannel field for DefaultFileRegion");
+ return JNI_ERR;
+ }
+ transferedFieldId = (*env)->GetFieldID(env, fileRegionCls, "transfered", "J");
+ if (transferedFieldId == NULL) {
+ throwRuntimeException(env, "Unable to obtain transfered field for DefaultFileRegion");
+ return JNI_ERR;
+ }
+
+ jclass fileChannelCls = (*env)->FindClass(env, "sun/nio/ch/FileChannelImpl");
+ if (fileChannelCls == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ fileDescriptorFieldId = (*env)->GetFieldID(env, fileChannelCls, "fd", "Ljava/io/FileDescriptor;");
+ if (fileDescriptorFieldId == NULL) {
+ throwRuntimeException(env, "Unable to obtain fd field for FileChannelImpl");
+ return JNI_ERR;
+ }
+
+ jclass fileDescriptorCls = (*env)->FindClass(env, "java/io/FileDescriptor");
+ if (fileDescriptorCls == NULL) {
+ // pending exception...
+ return JNI_ERR;
+ }
+ fdFieldId = (*env)->GetFieldID(env, fileDescriptorCls, "fd", "I");
+ if (fdFieldId == NULL) {
+ throwRuntimeException(env, "Unable to obtain fd field for FileDescriptor");
+ return JNI_ERR;
+ }
+
+ inetSocketAddrMethodId = (*env)->GetMethodID(env, inetSocketAddressClass, "<init>", "(Ljava/lang/String;I)V");
+ if (inetSocketAddrMethodId == NULL) {
+ throwRuntimeException(env, "Unable to obtain constructor of InetSocketAddress");
+ return JNI_ERR;
+ }
+ socketType = socket_type();
+ return JNI_VERSION_1_6;
+ }
+}
+
+void JNI_OnUnload(JavaVM *vm, void *reserved) {
+ JNIEnv* env;
+ if ((*vm)->GetEnv(vm, (void **) &env, JNI_VERSION_1_6) != JNI_OK) {
+ // Something is wrong but nothing we can do about this :(
+ return;
+ } else {
+ // delete global references so the GC can collect them
+ if (runtimeExceptionClass != NULL) {
+ (*env)->DeleteGlobalRef(env, runtimeExceptionClass);
+ }
+ if (ioExceptionClass != NULL) {
+ (*env)->DeleteGlobalRef(env, ioExceptionClass);
+ }
+ if (closedChannelExceptionClass != NULL) {
+ (*env)->DeleteGlobalRef(env, closedChannelExceptionClass);
+ }
+ if (inetSocketAddressClass != NULL) {
+ (*env)->DeleteGlobalRef(env, inetSocketAddressClass);
+ }
+ }
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz) {
+ jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+
+ if (eventFD < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error creating eventFD(...): ", err));
+ }
+ return eventFD;
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value) {
+ jint eventFD = eventfd_write(fd, (eventfd_t)value);
+
+ if (eventFD < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error calling eventfd_write(...): ", err));
+ }
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd) {
+ uint64_t eventfd_t;
+
+ if (eventfd_read(fd, &eventfd_t) != 0) {
+ // something is serious wrong
+ throwRuntimeException(env, "Error calling eventfd_read(...)");
+ }
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz) {
+ jint efd = epoll_create1(EPOLL_CLOEXEC);
+ if (efd < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error during epoll_create(...): ", err));
+ }
+ return efd;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout) {
+ int len = (*env)->GetArrayLength(env, events);
+ struct epoll_event ev[len];
+ int ready;
+ int err;
+ do {
+ ready = epoll_wait(efd, ev, len, timeout);
+ // was interrupted try again.
+ } while (ready == -1 && (( err = errno) == EINTR));
+
+ if (ready < 0) {
+ throwIOException(env, exceptionMessage("Error during epoll_wait(...): ", err));
+ return -1;
+ }
+ if (ready == 0) {
+ // nothing ready for process
+ return 0;
+ }
+
+ jboolean isCopy;
+ jlong *elements = (*env)->GetLongArrayElements(env, events, &isCopy);
+ if (elements == NULL) {
+ // No memory left ?!?!?
+ throwOutOfMemoryError(env, "Can't allocate memory");
+ return -1;
+ }
+ int i;
+ for (i = 0; i < ready; i++) {
+ // store the ready ops and id
+ elements[i] = (jlong) ev[i].data.u64;
+ if (ev[i].events & EPOLLIN) {
+ elements[i] |= EPOLL_READ;
+ }
+ if (ev[i].events & EPOLLRDHUP) {
+ elements[i] |= EPOLL_RDHUP;
+ }
+ if (ev[i].events & EPOLLOUT) {
+ elements[i] |= EPOLL_WRITE;
+ }
+ }
+ jint mode;
+ // release again to prevent memory leak
+ if (isCopy) {
+ mode = 0;
+ } else {
+ // was just pinned so use JNI_ABORT to eliminate not needed copy.
+ mode = JNI_ABORT;
+ }
+ (*env)->ReleaseLongArrayElements(env, events, elements, mode);
+
+ return ready;
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) {
+ if (epollCtl(env, efd, EPOLL_CTL_ADD, fd, flags, id) < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err));
+ }
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id) {
+ if (epollCtl(env, efd, EPOLL_CTL_MOD, fd, flags, id) < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err));
+ }
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd) {
+ // Create an empty event to workaround a bug in older kernels which can not handle NULL.
+ struct epoll_event event = { 0 };
+ if (epoll_ctl(efd, EPOLL_CTL_DEL, fd, &event) < 0) {
+ int err = errno;
+ throwRuntimeException(env, exceptionMessage("Error during calling epoll_ctl(...): ", err));
+ }
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) {
+ // TODO: We could also maybe pass the address in directly and so eliminate this call
+ // not sure if this would buy us much. So some testing is needed.
+ void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
+ if (buffer == NULL) {
+ throwRuntimeException(env, "Unable to access address of buffer");
+ return -1;
+ }
+ ssize_t res;
+ int err;
+ do {
+ res = write(fd, buffer + pos, (size_t) (limit - pos));
+ // keep on writing if it was interrupted
+ } while(res == -1 && ((err = errno) == EINTR));
+
+ if (res < 0) {
+ // network stack saturated... try again later
+ if (err == EAGAIN || err == EWOULDBLOCK) {
+ return 0;
+ }
+ if (err == EBADF) {
+ throwClosedChannelException(env);
+ return -1;
+ }
+ throwIOException(env, exceptionMessage("Error while write(...): ", err));
+ return -1;
+ }
+ return (jint) res;
+}
+
+JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
+ struct iovec iov[length];
+ int i;
+ int iovidx = 0;
+ for (i = offset; i < length; i++) {
+ jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, i);
+ jint pos;
+ // Get the current position using the (*env)->GetIntField if possible and fallback
+ // to slower (*env)->CallIntMethod(...) if needed
+ if (posFieldId == NULL) {
+ pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
+ } else {
+ pos = (*env)->GetIntField(env, bufObj, posFieldId);
+ }
+ jint limit;
+ // Get the current limit using the (*env)->GetIntField if possible and fallback
+ // to slower (*env)->CallIntMethod(...) if needed
+ if (limitFieldId == NULL) {
+ limit = (*env)->CallIntMethod(env, bufObj, limitId, NULL);
+ } else {
+ limit = (*env)->GetIntField(env, bufObj, limitFieldId);
+ }
+ void *buffer = (*env)->GetDirectBufferAddress(env, bufObj);
+ if (buffer == NULL) {
+ throwRuntimeException(env, "Unable to access address of buffer");
+ return -1;
+ }
+ iov[iovidx].iov_base = buffer + pos;
+ iov[iovidx].iov_len = (size_t) (limit - pos);
+ iovidx++;
+ }
+
+ ssize_t res;
+ int err;
+ do {
+ res = writev(fd, iov, length);
+ // keep on writing if it was interrupted
+ } while(res == -1 && ((err = errno) == EINTR));
+
+ if (res < 0) {
+ if (err == EAGAIN || err == EWOULDBLOCK) {
+ // network stack is saturated we will try again later
+ return 0;
+ }
+ if (err == EBADF) {
+ throwClosedChannelException(env);
+ return -1;
+ }
+ throwIOException(env, exceptionMessage("Error while write(...): ", err));
+ return -1;
+ }
+ return res;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit) {
+ // TODO: We could also maybe pass the address in directly and so eliminate this call
+ // not sure if this would buy us much. So some testing is needed.
+ void *buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
+ if (buffer == NULL) {
+ throwRuntimeException(env, "Unable to access address of buffer");
+ return -1;
+ }
+ ssize_t res;
+ int err;
+ do {
+ res = read(fd, buffer + pos, (size_t) (limit - pos));
+ // Keep on reading if we was interrupted
+ } while (res == -1 && ((err = errno) == EINTR));
+
+ if (res < 0) {
+ if (err == EAGAIN || err == EWOULDBLOCK) {
+ // Nothing left to read
+ return 0;
+ }
+ if (err == EBADF) {
+ throwClosedChannelException(env);
+ return -1;
+ }
+ throwIOException(env, exceptionMessage("Error while read(...): ", err));
+ return -1;
+ }
+
+ if (res == 0) {
+ // end-of-stream
+ return -1;
+ }
+ return (jint) res;
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd) {
+ if (close(fd) < 0) {
+ throwIOException(env, "Error closing file descriptor");
+ }
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write) {
+ int mode;
+ if (read && write) {
+ mode = SHUT_RDWR;
+ } else if (read) {
+ mode = SHUT_RD;
+ } else if (write) {
+ mode = SHUT_WR;
+ }
+ if (shutdown(fd, mode) < 0) {
+ throwIOException(env, "Error shutdown socket file descriptor");
+ }
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz) {
+ // TODO: Maybe also respect -Djava.net.preferIPv4Stack=true
+ int fd = socket(socketType, SOCK_STREAM | SOCK_NONBLOCK, 0);
+ if (fd == -1) {
+ int err = errno;
+ throwIOException(env, exceptionMessage("Error creating socket: ", err));
+ return -1;
+ } else if (socketType == AF_INET6){
+ // Allow to listen /connect ipv4 and ipv6
+ int optval = 0;
+ if (setOption(env, fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)) < 0) {
+ // Something went wrong so close the fd and return here. setOption(...) itself throws the exception already.
+ close(fd);
+ return -1;
+ }
+ }
+ return fd;
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
+ struct sockaddr_storage addr;
+ init_sockaddr(env, address, scopeId, port, &addr);
+
+ if(bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1){
+ int err = errno;
+ throwIOException(env, exceptionMessage("Error during bind(...): ", err));
+ }
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog) {
+ if(listen(fd, backlog) == -1) {
+ int err = errno;
+ throwIOException(env, exceptionMessage("Error during listen(...): ", err));
+ }
+}
+
+JNIEXPORT jboolean JNICALL Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port) {
+ struct sockaddr_storage addr;
+ init_sockaddr(env, address, scopeId, port, &addr);
+
+ int res;
+ int err;
+ do {
+ res = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
+ } while (res == -1 && ((err = errno) == EINTR));
+
+ if (res < 0) {
+ if (err == EINPROGRESS) {
+ // connect not complete yet need to wait for EPOLLOUT event
+ return JNI_FALSE;
+ }
+ throwIOException(env, exceptionMessage("Unable to connect to remote host: ", err));
+
+ return JNI_FALSE;
+ }
+ return JNI_TRUE;
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd) {
+ // connect done, check for error
+ int optval;
+ int res = getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval));
+ if (res == 0) {
+ return;
+ }
+ throwIOException(env, exceptionMessage("Unable to connect to remote host: ", optval));
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd) {
+ jint socketFd;
+ int err;
+
+ do {
+ if (accept4) {
+ socketFd = accept4(fd, NULL, 0, SOCK_NONBLOCK | SOCK_CLOEXEC);
+ } else {
+ socketFd = accept(fd, NULL, 0);
+ }
+ } while (socketFd == -1 && ((err = errno) == EINTR));
+
+ if (socketFd == -1) {
+ if (err == EAGAIN || err == EWOULDBLOCK) {
+ // Everything consumed so just return -1 here.
+ return -1;
+ } else {
+ throwIOException(env, exceptionMessage("Error during accept(...): ", err));
+ return -1;
+ }
+ }
+ if (accept4) {
+ return socketFd;
+ } else {
+ // accept4 was not present so need two more sys-calls ...
+ if (fcntl(socketFd, F_SETFD, FD_CLOEXEC) == -1) {
+ throwIOException(env, exceptionMessage("Error during accept(...): ", err));
+ return -1;
+ }
+ if (fcntl(socketFd, F_SETFL, O_NONBLOCK) == -1) {
+ throwIOException(env, exceptionMessage("Error during accept(...): ", err));
+ return -1;
+ }
+ }
+ return socketFd;
+}
+
+JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len) {
+ jobject fileChannel = (*env)->GetObjectField(env, fileRegion, fileChannelFieldId);
+ if (fileChannel == NULL) {
+ throwRuntimeException(env, "Unable to obtain FileChannel from FileRegion");
+ return -1;
+ }
+ jobject fileDescriptor = (*env)->GetObjectField(env, fileChannel, fileDescriptorFieldId);
+ if (fileDescriptor == NULL) {
+ throwRuntimeException(env, "Unable to obtain FileDescriptor from FileChannel");
+ return -1;
+ }
+ jint srcFd = (*env)->GetIntField(env, fileDescriptor, fdFieldId);
+ if (srcFd == -1) {
+ throwRuntimeException(env, "Unable to obtain the fd from the FileDescriptor");
+ return -1;
+ }
+ ssize_t res;
+ off_t offset = off;
+ int err;
+ do {
+ res = sendfile(fd, srcFd, &offset, (size_t) len);
+ } while (res == -1 && ((err = errno) == EINTR));
+ if (res < 0) {
+ if (err == EAGAIN) {
+ return 0;
+ }
+ throwIOException(env, exceptionMessage("Error during accept(...): ", err));
+ return -1;
+ }
+ if (res > 0) {
+ // update the transfered field in DefaultFileRegion
+ (*env)->SetLongField(env, fileRegion, transferedFieldId, off + res);
+ }
+
+ return res;
+}
+
+JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd) {
+ socklen_t len;
+ struct sockaddr_storage addr;
+
+ len = sizeof addr;
+ if (getpeername(fd, (struct sockaddr*)&addr, &len) == -1) {
+ return NULL;
+ }
+ return createInetSocketAddress(env, addr);
+}
+
+JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd) {
+ socklen_t len;
+ struct sockaddr_storage addr;
+
+ len = sizeof addr;
+ if (getsockname(fd, (struct sockaddr*)&addr, &len) == -1) {
+ return NULL;
+ }
+ return createInetSocketAddress(env, addr);
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ setOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval));
+}
+
+JNIEXPORT void JNICALL Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval) {
+ struct linger solinger;
+ if (optval < 0) {
+ solinger.l_onoff = 0;
+ solinger.l_linger = 0;
+ } else {
+ solinger.l_onoff = 1;
+ solinger.l_linger = optval;
+ }
+ setOption(env, fd, SOL_SOCKET, SO_LINGER, &solinger, sizeof(solinger));
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd) {
+ int optval;
+ if (getOption(env, fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ return optval;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd) {
+ int optval;
+ if (getOption(env, fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ return optval;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd) {
+ int optval;
+ if (getOption(env, fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ return optval;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd) {
+ int optval;
+ if (getOption(env, fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ return optval;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd) {
+ int optval;
+ if (getOption(env, fd, SOL_TCP, TCP_CORK, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ return optval;
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd) {
+ struct linger optval;
+ if (getOption(env, fd, SOL_SOCKET, SO_LINGER, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ if (optval.l_onoff == 0) {
+ return -1;
+ } else {
+ return optval.l_linger;
+ }
+}
+
+JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd) {
+ int optval;
+ if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
+ return -1;
+ }
+ return optval;
+}
+
View
60 transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.h
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+#include <jni.h>
+
+
+#define EPOLL_READ 0x01
+#define EPOLL_WRITE 0x02
+#define EPOLL_ACCEPT 0x04
+#define EPOLL_RDHUP 0x08
+
+jint Java_io_netty_channel_epoll_Native_eventFd(JNIEnv * env, jclass clazz);
+void Java_io_netty_channel_epoll_Native_eventFdWrite(JNIEnv * env, jclass clazz, jint fd, jlong value);
+void Java_io_netty_channel_epoll_Native_eventFdRead(JNIEnv * env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_epollCreate(JNIEnv * env, jclass clazz);
+jint Java_io_netty_channel_epoll_Native_epollWait(JNIEnv * env, jclass clazz, jint efd, jlongArray events, jint timeout);
+void Java_io_netty_channel_epoll_Native_epollCtlAdd(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id);
+void Java_io_netty_channel_epoll_Native_epollCtlMod(JNIEnv * env, jclass clazz, jint efd, jint fd, jint flags, jint id);
+void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz, jint efd, jint fd);
+jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
+jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
+jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
+void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd);
+void Java_io_netty_channel_epoll_Native_shutdown(JNIEnv * env, jclass clazz, jint fd, jboolean read, jboolean write);
+jint Java_io_netty_channel_epoll_Native_socket(JNIEnv * env, jclass clazz);
+void Java_io_netty_channel_epoll_Native_bind(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port);
+void Java_io_netty_channel_epoll_Native_listen(JNIEnv * env, jclass clazz, jint fd, jint backlog);
+jboolean Java_io_netty_channel_epoll_Native_connect(JNIEnv * env, jclass clazz, jint fd, jbyteArray address, jint scopeId, jint port);
+void Java_io_netty_channel_epoll_Native_finishConnect(JNIEnv * env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_accept(JNIEnv * env, jclass clazz, jint fd);
+jlong Java_io_netty_channel_epoll_Native_sendfile(JNIEnv *env, jclass clazz, jint fd, jobject fileRegion, jlong off, jlong len);
+jobject Java_io_netty_channel_epoll_Native_remoteAddress(JNIEnv * env, jclass clazz, jint fd);
+jobject Java_io_netty_channel_epoll_Native_localAddress(JNIEnv * env, jclass clazz, jint fd);
+void Java_io_netty_channel_epoll_Native_setReuseAddress(JNIEnv * env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setTcpNoDelay(JNIEnv *env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setReceiveBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setSendBufferSize(JNIEnv *env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setKeepAlive(JNIEnv *env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setTcpCork(JNIEnv *env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setSoLinger(JNIEnv *env, jclass clazz, jint fd, jint optval);
+void Java_io_netty_channel_epoll_Native_setTrafficClass(JNIEnv *env, jclass clazz, jint fd, jint optval);
+jint Java_io_netty_channel_epoll_Native_isReuseAddresss(JNIEnv *env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_isTcpNoDelay(JNIEnv *env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_getReceiveBufferSize(JNIEnv * env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_getSendBufferSize(JNIEnv *env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_isTcpCork(JNIEnv *env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv *env, jclass clazz, jint fd);
+jint Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv *env, jclass clazz, jint fd);
View
163 transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.EventLoop;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+abstract class AbstractEpollChannel extends AbstractChannel {
+ private static final ChannelMetadata DATA = new ChannelMetadata(false);
+ private final int readFlag;
+ protected int flags;
+ protected volatile boolean active;
+ volatile int fd;
+ int id;
+
+ AbstractEpollChannel(Channel parent, EventLoop eventLoop, int fd, int flag) {
+ super(parent, eventLoop);
+ this.fd = fd;
+ readFlag = flag;
+ flags |= flag;
+ }
+
+ AbstractEpollChannel(EventLoop eventLoop, int flag) {
+ this(null, eventLoop, socketFd(), flag);
+ }
+
+ private static int socketFd() {
+ try {
+ return Native.socket();
+ } catch (IOException e) {
+ throw new ChannelException(e);
+ }
+ }
+
+ @Override
+ public boolean isActive() {
+ return active;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return DATA;
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ active = false;
+ int fd = this.fd;
+ this.fd = -1;
+ Native.close(fd);
+ }
+
+ @Override
+ public InetSocketAddress remoteAddress() {
+ return (InetSocketAddress) super.remoteAddress();
+ }
+
+ @Override
+ public InetSocketAddress localAddress() {
+ return (InetSocketAddress) super.localAddress();
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+ doClose();
+ }
+
+ @Override
+ protected boolean isCompatible(EventLoop loop) {
+ return loop instanceof EpollEventLoop;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return fd != -1;
+ }
+
+ @Override
+ protected void doDeregister() throws Exception {
+ ((EpollEventLoop) eventLoop()).remove(this);
+ }
+
+ @Override
+ protected void doBeginRead() throws Exception {
+ if ((flags & readFlag) == 0) {
+ flags |= readFlag;
+ ((EpollEventLoop) eventLoop()).modify(this);
+ }
+ }
+
+ protected final void clearEpollIn() {
+ if ((flags & readFlag) != 0) {
+ flags = ~readFlag;
+ ((EpollEventLoop) eventLoop()).modify(this);
+ }
+ }
+
+ @Override
+ protected void doRegister() throws Exception {
+ EpollEventLoop loop = (EpollEventLoop) eventLoop();
+ loop.add(this);
+ }
+
+ @Override
+ protected abstract AbstractEpollUnsafe newUnsafe();
+
+ protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
+
+ /**
+ * Called once EPOLLIN event is ready to be processed
+ */
+ abstract void epollInReady();
+
+ /**
+ * Called once EPOLLRDHUP event is ready to be processed
+ */
+ void epollRdHupReady() {
+ // NOOP
+ }
+
+ @Override
+ protected void flush0() {
+ // Flush immediately only when there's no pending flush.
+ // If there's a pending flush operation, event loop will call forceFlush() later,
+ // and thus there's no need to call it now.
+ if (isFlushPending()) {
+ return;
+ }
+ super.flush0();
+ }
+
+ /**
+ * Called once a EPOLLOUT event is ready to be processed
+ */
+ void epollOutReady() {
+ // directly call super.flush0() to force a flush now
+ super.flush0();
+ }
+
+ private boolean isFlushPending() {
+ return (flags & Native.EPOLLOUT) != 0;
+ }
+ }
+}
View
27 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelOption.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.channel.ChannelOption;
+
+public final class EpollChannelOption {
+ private static final Class<EpollChannelOption> T = EpollChannelOption.class;
+
+ public static final ChannelOption<Boolean> TCP_CORK = ChannelOption.valueOf(T, "TCP_CORK");
+
+ private EpollChannelOption() { }
+
+}
View
356 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java
@@ -0,0 +1,356 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
+ */
+final class EpollEventLoop extends SingleThreadEventLoop {
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
+ private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER;
+
+ static {
+ AtomicIntegerFieldUpdater<EpollEventLoop> updater =
+ PlatformDependent.newAtomicIntegerFieldUpdater(EpollEventLoop.class, "wakenUp");
+ if (updater == null) {
+ updater = AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
+ }
+ WAKEN_UP_UPDATER = updater;
+ }
+
+ private final int epollFd;
+ private final int eventFd;
+ private final Map<Integer, AbstractEpollChannel> ids = new HashMap<Integer, AbstractEpollChannel>();
+ private final long[] events;
+
+ private int id;
+ private int oldWakenUp;
+ private boolean overflown;
+
+ @SuppressWarnings("unused")
+ private volatile int wakenUp;
+ private volatile int ioRatio = 50;
+
+ EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents) {
+ super(parent, executor, false);
+ events = new long[maxEvents];
+ boolean success = false;
+ int epollFd = -1;
+ int eventFd = -1;
+ try {
+ this.epollFd = epollFd = Native.epollCreate();
+ this.eventFd = eventFd = Native.eventFd();
+ Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN, 0);
+ success = true;
+ } finally {
+ if (!success) {
+ if (epollFd != -1) {
+ try {
+ Native.close(epollFd);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ if (eventFd != -1) {
+ try {
+ Native.close(eventFd);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+
+ private int nextId() {
+ int id = this.id;
+ if (id == Integer.MAX_VALUE) {
+ overflown = true;
+ id = 0;
+ }
+ if (overflown) {
+ // the ids had an overflow before so we need to make sure the id is not in use atm before assign
+ // it.
+ for (;;) {
+ if (!ids.containsKey(++id)) {
+ this.id = id;
+ break;
+ }
+ }
+ } else {
+ this.id = ++id;
+ }
+ return id;
+ }
+
+ @Override
+ protected void wakeup(boolean inEventLoop) {
+ if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
+ // write to the evfd which will then wake-up epoll_wait(...)
+ Native.eventFdWrite(eventFd, 1L);
+ }
+ }
+
+ /**
+ * Register the given epoll with this {@link io.netty.channel.EventLoop}.
+ */
+ void add(AbstractEpollChannel ch) {
+ assert inEventLoop();
+ int id = nextId();
+ Native.epollCtlAdd(epollFd, ch.fd, ch.flags, id);
+ ch.id = id;
+ ids.put(id, ch);
+ }
+
+ /**
+ * The flags of the given epoll was modified so update the registration
+ */
+ void modify(AbstractEpollChannel ch) {
+ assert inEventLoop();
+ Native.epollCtlMod(epollFd, ch.fd, ch.flags, ch.id);
+ }
+
+ /**
+ * Deregister the given epoll from this {@link io.netty.channel.EventLoop}.
+ */
+ void remove(AbstractEpollChannel ch) {
+ assert inEventLoop();
+ if (ids.remove(ch.id) != null && ch.isOpen()) {
+ // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
+ // removed once the file-descriptor is closed.
+ Native.epollCtlDel(epollFd, ch.fd);
+ }
+ }
+
+ @Override
+ protected Queue<Runnable> newTaskQueue() {
+ // This event loop never calls takeTask()
+ return new ConcurrentLinkedQueue<Runnable>();
+ }
+
+ /**
+ * Returns the percentage of the desired amount of time spent for I/O in the event loop.
+ */
+ public int getIoRatio() {
+ return ioRatio;
+ }
+
+ /**
+ * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
+ * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
+ */
+ public void setIoRatio(int ioRatio) {
+ if (ioRatio <= 0 || ioRatio > 100) {
+ throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
+ }
+ this.ioRatio = ioRatio;
+ }
+
+ private int epollWait() {
+ int selectCnt = 0;
+ long currentTimeNanos = System.nanoTime();
+ long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
+ for (;;) {
+ long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
+ if (timeoutMillis <= 0) {
+ if (selectCnt == 0) {
+ int ready = Native.epollWait(epollFd, events, 0);
+ if (ready > 0) {
+ return ready;
+ }
+ }
+ break;
+ }
+
+ int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis);
+ selectCnt ++;
+
+ if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) {
+ // Selected something,
+ // waken up by user, or
+ // the task queue has a pending task.
+ return selectedKeys;
+ }
+ currentTimeNanos = System.nanoTime();
+ }
+ return 0;
+ }
+
+ @Override
+ protected void run() {
+ for (;;) {
+ oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0);
+ try {
+ int ready;
+ if (hasTasks()) {
+ // Non blocking just return what is ready directly without block
+ ready = Native.epollWait(epollFd, events, 0);
+ } else {
+ ready = epollWait();
+
+ // 'wakenUp.compareAndSet(false, true)' is always evaluated
+ // before calling 'selector.wakeup()' to reduce the wake-up
+ // overhead. (Selector.wakeup() is an expensive operation.)
+ //
+ // However, there is a race condition in this approach.
+ // The race condition is triggered when 'wakenUp' is set to
+ // true too early.
+ //
+ // 'wakenUp' is set to true too early if:
+ // 1) Selector is waken up between 'wakenUp.set(false)' and
+ // 'selector.select(...)'. (BAD)
+ // 2) Selector is waken up between 'selector.select(...)' and
+ // 'if (wakenUp.get()) { ... }'. (OK)
+ //
+ // In the first case, 'wakenUp' is set to true and the
+ // following 'selector.select(...)' will wake up immediately.
+ // Until 'wakenUp' is set to false again in the next round,
+ // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
+ // any attempt to wake up the Selector will fail, too, causing
+ // the following 'selector.select(...)' call to block
+ // unnecessarily.
+ //
+ // To fix this problem, we wake up the selector again if wakenUp
+ // is true immediately after selector.select(...).
+ // It is inefficient in that it wakes up the selector for both
+ // the first case (BAD - wake-up required) and the second case
+ // (OK - no wake-up required).
+
+ if (wakenUp == 1) {
+ Native.eventFdWrite(eventFd, 1L);
+ }
+ }
+
+ final int ioRatio = this.ioRatio;
+ if (ioRatio == 100) {
+ if (ready > 0) {
+ processReady(events, ready);
+ }
+ runAllTasks();
+ } else {
+ final long ioStartTime = System.nanoTime();
+
+ if (ready > 0) {
+ processReady(events, ready);
+ }
+
+ final long ioTime = System.nanoTime() - ioStartTime;
+ runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
+ }
+
+ if (isShuttingDown()) {
+ closeAll();
+ if (confirmShutdown()) {
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception in the selector loop.", t);
+
+ // Prevent possible consecutive immediate failures that lead to
+ // excessive CPU consumption.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ }
+ }
+
+ private void closeAll() {
+ int ready = Native.epollWait(epollFd, events, 0);
+ Collection<AbstractEpollChannel> channels = new ArrayList<AbstractEpollChannel>(ready);
+
+ for (int i = 0; i < ready; i++) {
+ final long ev = events[i];
+
+ int id = (int) (ev >> 32L);
+ AbstractEpollChannel ch = ids.get(id);
+ if (ch != null) {
+ channels.add(ids.get(id));
+ }
+ }
+
+ for (AbstractEpollChannel ch: channels) {
+ ch.unsafe().close(ch.unsafe().voidPromise());
+ }
+ }
+
+ private void processReady(long[] events, int ready) {
+ for (int i = 0; i < ready; i ++) {
+ final long ev = events[i];
+
+ int id = (int) (ev >> 32L);
+ if (id == 0) {
+ // consume wakeup event
+ Native.eventFdRead(eventFd);
+ } else {
+ boolean read = (ev & Native.EPOLLIN) != 0;
+ boolean write = (ev & Native.EPOLLOUT) != 0;
+ boolean close = (ev & Native.EPOLLRDHUP) != 0;
+
+ AbstractEpollChannel ch = ids.get(id);
+ if (ch != null) {
+ AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
+ if (write) {
+ // force flush of data as the epoll is writable again
+ unsafe.epollOutReady();
+ }
+ if (read) {
+ // Something is ready to read, so consume it now
+ unsafe.epollInReady();
+ }
+ if (close) {
+ unsafe.epollRdHupReady();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ try {
+ Native.close(epollFd);
+ } catch (IOException e) {
+ logger.warn("Failed to close the epoll fd.", e);
+ }
+ try {
+ Native.close(eventFd);
+ } catch (IOException e) {
+ logger.warn("Failed to close the event fd.", e);
+ }
+ }
+}
View
75 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultithreadEventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * {@link EventLoopGroup} which uses epoll under the covers. Because of this
+ * it only works on linux.
+ */
+public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
+
+ /**
+ * Create a new instance using the default number of threads and the default {@link ThreadFactory}.
+ */
+ public EpollEventLoopGroup() {
+ this(0);
+ }
+
+ /**
+ * Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
+ */
+ public EpollEventLoopGroup(int nThreads) {
+ this(nThreads, null);
+ }
+
+ /**
+ * Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
+ */
+ public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
+ this(nThreads, threadFactory, 128);
+ }
+
+ /**
+ * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
+ * maximal amount of epoll events to handle per epollWait(...).
+ */
+ public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
+ super(nThreads, threadFactory, maxEventsAtOnce);
+ }
+
+ /**
+ * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
+ * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
+ */
+ public void setIoRatio(int ioRatio) {
+ for (EventExecutor e: children()) {
+ ((EpollEventLoop) e).setIoRatio(ioRatio);
+ }
+ }
+
+ @Override
+ protected EventLoop newChild(Executor executor, Object... args) throws Exception {
+ return new EpollEventLoop(this, executor, (Integer) args[0]);
+ }
+}
View
133 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.ServerSocketChannelConfig;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * {@link ServerSocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
+ * maximal performance.
+ */
+public final class EpollServerSocketChannel extends AbstractEpollChannel implements ServerSocketChannel {
+
+ private final EpollServerSocketChannelConfig config;
+ private final EventLoopGroup childGroup;
+ private volatile InetSocketAddress local;
+
+ public EpollServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
+ super(eventLoop, Native.EPOLLACCEPT);
+ config = new EpollServerSocketChannelConfig(this);
+ this.childGroup = childGroup;
+ }
+
+ @Override
+ protected boolean isCompatible(EventLoop loop) {
+ return loop instanceof EpollEventLoop;
+ }
+
+ @Override
+ protected void doBind(SocketAddress localAddress) throws Exception {
+ InetSocketAddress addr = (InetSocketAddress) localAddress;
+ Native.bind(fd, addr.getAddress(), addr.getPort());
+ local = addr;
+ Native.listen(fd, config.getBacklog());
+ active = true;
+ }
+
+ @Override
+ public ServerSocketChannelConfig config() {
+ return config;
+ }
+
+ @Override
+ protected InetSocketAddress localAddress0() {
+ return local;
+ }
+
+ @Override
+ protected InetSocketAddress remoteAddress0() {
+ return null;
+ }
+
+ @Override
+ protected AbstractEpollUnsafe newUnsafe() {
+ return new EpollServerSocketUnsafe();
+ }
+
+ @Override
+ protected void doWrite(ChannelOutboundBuffer in) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public EventLoopGroup childEventLoopGroup() {
+ return childGroup;
+ }
+
+ final class EpollServerSocketUnsafe extends AbstractEpollUnsafe {
+ @Override
+ public void connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
+ // Connect not supported by ServerChannel implementations
+ channelPromise.setFailure(new UnsupportedOperationException());
+ }
+
+ @Override
+ void epollInReady() {
+ assert eventLoop().inEventLoop();
+ try {
+ final ChannelPipeline pipeline = pipeline();
+ Throwable exception = null;
+ try {
+ for (;;) {
+ int socketFd = Native.accept(fd);
+ if (socketFd == -1) {
+ // this means everything was handled for now
+ break;
+ }
+ try {
+ pipeline.fireChannelRead(new EpollSocketChannel(EpollServerSocketChannel.this,
+ childEventLoopGroup().next(), socketFd));
+ } catch (Throwable t) {
+ // keep on reading as we use epoll ET and need to consume everything from the socket
+ pipeline.fireChannelReadComplete();
+ pipeline.fireExceptionCaught(t);
+ }
+ }
+ } catch (Throwable t) {
+ exception = t;
+ }
+ pipeline.fireChannelReadComplete();
+
+ if (exception != null) {
+ pipeline.fireExceptionCaught(exception);
+ }
+ } finally {
+ if (!config().isAutoRead()) {
+ clearEpollIn();
+ }
+ }
+ }
+ }
+}
View
176 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.MessageSizeEstimator;
+import io.netty.channel.RecvByteBufAllocator;
+import io.netty.channel.socket.ServerSocketChannelConfig;
+import io.netty.util.NetUtil;
+
+import java.util.Map;
+
+import static io.netty.channel.ChannelOption.SO_BACKLOG;
+import static io.netty.channel.ChannelOption.SO_RCVBUF;
+import static io.netty.channel.ChannelOption.SO_REUSEADDR;
+
+final class EpollServerSocketChannelConfig extends DefaultChannelConfig
+ implements ServerSocketChannelConfig {
+
+ private final EpollServerSocketChannel channel;
+ private volatile int backlog = NetUtil.SOMAXCONN;
+
+ EpollServerSocketChannelConfig(EpollServerSocketChannel channel) {
+ super(channel);
+ this.channel = channel;
+ }
+
+ @Override
+ public Map<ChannelOption<?>, Object> getOptions() {
+ return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getOption(ChannelOption<T> option) {
+ if (option == SO_RCVBUF) {
+ return (T) Integer.valueOf(getReceiveBufferSize());
+ }
+ if (option == SO_REUSEADDR) {
+ return (T) Boolean.valueOf(isReuseAddress());
+ }
+ if (option == SO_BACKLOG) {
+ return (T) Integer.valueOf(getBacklog());
+ }
+
+ return super.getOption(option);
+ }
+
+ @Override
+ public <T> boolean setOption(ChannelOption<T> option, T value) {
+ validate(option, value);
+
+ if (option == SO_RCVBUF) {
+ setReceiveBufferSize((Integer) value);
+ } else if (option == SO_REUSEADDR) {
+ setReuseAddress((Boolean) value);
+ } else if (option == SO_BACKLOG) {
+ setBacklog((Integer) value);
+ } else {
+ return super.setOption(option, value);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean isReuseAddress() {
+ return Native.isReuseAddress(channel.fd) == 1;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
+ Native.setReuseAddress(channel.fd, reuseAddress ? 1 : 0);
+ return this;
+ }
+
+ @Override
+ public int getReceiveBufferSize() {
+ return Native.getReceiveBufferSize(channel.fd);
+ }
+
+ @Override
+ public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
+ Native.setReceiveBufferSize(channel.fd, receiveBufferSize);
+
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
+ return this;
+ }
+
+ @Override
+ public int getBacklog() {
+ return backlog;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setBacklog(int backlog) {
+ if (backlog < 0) {
+ throw new IllegalArgumentException("backlog: " + backlog);
+ }
+ this.backlog = backlog;
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
+ super.setConnectTimeoutMillis(connectTimeoutMillis);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
+ super.setMaxMessagesPerRead(maxMessagesPerRead);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
+ super.setWriteSpinCount(writeSpinCount);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
+ super.setAllocator(allocator);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
+ super.setRecvByteBufAllocator(allocator);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
+ super.setAutoRead(autoRead);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+ super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+ super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
+ return this;
+ }
+
+ @Override
+ public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
+ super.setMessageSizeEstimator(estimator);
+ return this;
+ }
+}
View
590 transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java
@@ -0,0 +1,590 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.channel.epoll;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.EventLoop;
+import io.netty.channel.RecvByteBufAllocator;
+import io.netty.channel.socket.ChannelInputShutdownEvent;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.internal.StringUtil;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/*