Skip to content

Commit

Permalink
Fix ClassCastException and native crash when using kqueue transport. (#…
Browse files Browse the repository at this point in the history
…8665)

Motivation:

How we did the mapping from native code to AbstractKQueueChannel was not safe and could lead to heap corruption. This then sometimes produced ClassCastExceptions or could also lead to crashes. This happened sometimes when running the testsuite.

Modifications:

Use a Map for the mapping (just as we do in the native epoll transport).

Result:

No more heap corruption / crashes.
  • Loading branch information
normanmaurer committed Dec 19, 2018
1 parent b12c331 commit 5ecb34e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 128 deletions.
88 changes: 5 additions & 83 deletions transport-native-kqueue/src/main/c/netty_kqueue_eventarray.c
Expand Up @@ -24,104 +24,26 @@
#include "netty_unix_jni.h"
#include "netty_unix_util.h"

static jfieldID kqueueJniPtrFieldId = NULL;

static void netty_kqueue_eventarray_evSet(JNIEnv* env, jclass clzz, jlong keventAddress, jobject channel, jint ident, jshort filter, jshort flags, jint fflags) {
// Create a global pointer, cast it as a long, and retain it in java to re-use and free later.
jlong jniSelfPtr = (*env)->GetLongField(env, channel, kqueueJniPtrFieldId);
if (jniSelfPtr == 0) {
jniSelfPtr = (jlong) (*env)->NewGlobalRef(env, channel);
(*env)->SetLongField(env, channel, kqueueJniPtrFieldId, jniSelfPtr);
} else if ((flags & EV_DELETE) != 0) {
// If the event is deleted, make sure it no longer has a reference to the jniSelfPtr because it shouldn't be used after this point.
jniSelfPtr = 0;
}
EV_SET((struct kevent*) keventAddress, ident, filter, flags, fflags, 0, (jobject) jniSelfPtr);
}

static jobject netty_kqueue_eventarray_getChannel(JNIEnv* env, jclass clazz, jlong keventAddress) {
struct kevent* event = (struct kevent*) keventAddress;
return event->udata == NULL ? NULL : (jobject) event->udata;
}

static void netty_kqueue_eventarray_deleteGlobalRefs(JNIEnv* env, jclass clazz, jlong channelAddressStart, jlong channelAddressEnd) {
// Iterate over an array of longs, which are really pointers to the jobject NewGlobalRef created above in evSet
// and delete each one. The field has already been set to 0 in java.
jlong* itr = (jlong*) channelAddressStart;
const jlong* end = (jlong*) channelAddressEnd;
for (; itr != end; ++itr) {
(*env)->DeleteGlobalRef(env, (jobject) *itr);
}
static void netty_kqueue_eventarray_evSet(JNIEnv* env, jclass clzz, jlong keventAddress, jint ident, jshort filter, jshort flags, jint fflags) {
EV_SET((struct kevent*) keventAddress, ident, filter, flags, fflags, 0, NULL);
}

// JNI Method Registration Table Begin
static const JNINativeMethod fixed_method_table[] = {
{ "deleteGlobalRefs", "(JJ)V", (void *) netty_kqueue_eventarray_deleteGlobalRefs }
// "evSet" has a dynamic signature
// "getChannel" has a dynamic signature
{ "evSet", "(JISSI)V", (void *) netty_kqueue_eventarray_evSet }
};
static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]);

static jint dynamicMethodsTableSize() {
return fixed_method_table_size + 2;
}

static JNINativeMethod* createDynamicMethodsTable(const char* packagePrefix) {
JNINativeMethod* dynamicMethods = malloc(sizeof(JNINativeMethod) * dynamicMethodsTableSize());
memcpy(dynamicMethods, fixed_method_table, sizeof(fixed_method_table));
char* dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel;ISSI)V");
JNINativeMethod* dynamicMethod = &dynamicMethods[fixed_method_table_size];
dynamicMethod->name = "evSet";
dynamicMethod->signature = netty_unix_util_prepend("(JL", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_kqueue_eventarray_evSet;
free(dynamicTypeName);

++dynamicMethod;
dynamicTypeName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel;");
dynamicMethod->name = "getChannel";
dynamicMethod->signature = netty_unix_util_prepend("(J)L", dynamicTypeName);
dynamicMethod->fnPtr = (void *) netty_kqueue_eventarray_getChannel;
free(dynamicTypeName);
return dynamicMethods;
}

static void freeDynamicMethodsTable(JNINativeMethod* dynamicMethods) {
jint fullMethodTableSize = dynamicMethodsTableSize();
jint i = fixed_method_table_size;
for (; i < fullMethodTableSize; ++i) {
free(dynamicMethods[i].signature);
}
free(dynamicMethods);
}
// JNI Method Registration Table End

jint netty_kqueue_eventarray_JNI_OnLoad(JNIEnv* env, const char* packagePrefix) {
JNINativeMethod* dynamicMethods = createDynamicMethodsTable(packagePrefix);
if (netty_unix_util_register_natives(env,
packagePrefix,
"io/netty/channel/kqueue/KQueueEventArray",
dynamicMethods,
dynamicMethodsTableSize()) != 0) {
freeDynamicMethodsTable(dynamicMethods);
return JNI_ERR;
}
freeDynamicMethodsTable(dynamicMethods);
dynamicMethods = NULL;

char* nettyClassName = netty_unix_util_prepend(packagePrefix, "io/netty/channel/kqueue/AbstractKQueueChannel");
jclass kqueueChannelCls = (*env)->FindClass(env, nettyClassName);
free(nettyClassName);
nettyClassName = NULL;
if (kqueueChannelCls == NULL) {
fixed_method_table,
fixed_method_table_size) != 0) {
return JNI_ERR;
}

kqueueJniPtrFieldId = (*env)->GetFieldID(env, kqueueChannelCls, "jniSelfPtr", "J");
if (kqueueJniPtrFieldId == NULL) {
netty_unix_errors_throwRuntimeException(env, "failed to get field ID: AbstractKQueueChannel.jniSelfPtr");
return JNI_ERR;
}

return NETTY_JNI_VERSION;
}

Expand Down
Expand Up @@ -69,15 +69,6 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
private boolean writeFilterEnabled;
boolean readReadyRunnablePending;
boolean inputClosedSeenErrorOnRead;
/**
* This member variable means we don't have to have a map in {@link KQueueEventLoop} which associates the FDs
* from kqueue to instances of this class. This field will be initialized by JNI when modifying kqueue events.
* If there is no global reference when JNI gets a kqueue evSet call (aka this field is 0) then a global reference
* will be created and the address will be saved in this member variable. Then when we process a kevent in Java
* we can ask JNI to give us the {@link AbstractKQueueChannel} that corresponds to that event.
*/
long jniSelfPtr;

protected volatile boolean active;
private volatile SocketAddress local;
private volatile SocketAddress remote;
Expand Down Expand Up @@ -213,6 +204,9 @@ protected void doRegister() throws Exception {
// make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
readReadyRunnablePending = false;

((KQueueEventLoop) eventLoop()).add(this);

// Add the write event first so we get notified of connection refused on the client side!
if (writeFilterEnabled) {
evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
Expand Down
Expand Up @@ -79,7 +79,7 @@ void clear() {

void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
reallocIfNeeded();
evSet(getKEventOffset(size++) + memoryAddress, ch, ch.socket.intValue(), filter, flags, fflags);
evSet(getKEventOffset(size++) + memoryAddress, ch.socket.intValue(), filter, flags, fflags);
}

private void reallocIfNeeded() {
Expand Down Expand Up @@ -165,16 +165,9 @@ long data(int index) {
return memory.getLong(getKEventOffset(index) + KQUEUE_DATA_OFFSET);
}

AbstractKQueueChannel channel(int index) {
return getChannel(getKEventOffsetAddress(index));
}

private static int calculateBufferCapacity(int capacity) {
return capacity * KQUEUE_EVENT_SIZE;
}

private static native void evSet(long keventAddress, AbstractKQueueChannel ch,
int ident, short filter, short flags, int fflags);
private static native AbstractKQueueChannel getChannel(long keventAddress);
static native void deleteGlobalRefs(long channelAddressStart, long channelAddressEnd);
private static native void evSet(long keventAddress, int ident, short filter, short flags, int fflags);
}
Expand Up @@ -23,6 +23,8 @@
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray;
import io.netty.util.IntSupplier;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
Expand All @@ -31,11 +33,9 @@

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static io.netty.channel.kqueue.KQueueEventArray.deleteGlobalRefs;
import static java.lang.Math.min;

/**
Expand All @@ -53,7 +53,6 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
KQueue.ensureAvailability();
}

private final NativeLongArray jniChannelPointers;
private final boolean allowGrowing;
private final FileDescriptor kqueueFd;
private final KQueueEventArray changeList;
Expand All @@ -66,6 +65,7 @@ public int get() throws Exception {
return kqueueWaitNow();
}
};
private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);

private volatile int wakenUp;
private volatile int ioRatio = 50;
Expand All @@ -83,26 +83,25 @@ public int get() throws Exception {
}
changeList = new KQueueEventArray(maxEvents);
eventList = new KQueueEventArray(maxEvents);
jniChannelPointers = new NativeLongArray(4096);
int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
if (result < 0) {
cleanup();
throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
}
}

void add(AbstractKQueueChannel ch) {
assert inEventLoop();
channels.put(ch.fd().intValue(), ch);
}

void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
changeList.evSet(ch, filter, flags, fflags);
}

void remove(AbstractKQueueChannel ch) throws IOException {
void remove(AbstractKQueueChannel ch) {
assert inEventLoop();
if (ch.jniSelfPtr == 0) {
return;
}

jniChannelPointers.add(ch.jniSelfPtr);
ch.jniSelfPtr = 0;
channels.remove(ch.fd().intValue());
}

/**
Expand Down Expand Up @@ -145,32 +144,25 @@ private int kqueueWaitNow() throws IOException {
}

private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
deleteJniChannelPointers();
int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
changeList.clear();
return numEvents;
}

private void deleteJniChannelPointers() {
if (!jniChannelPointers.isEmpty()) {
deleteGlobalRefs(jniChannelPointers.memoryAddress(), jniChannelPointers.memoryAddressEnd());
jniChannelPointers.clear();
}
}

private void processReady(int ready) {
for (int i = 0; i < ready; ++i) {
final short filter = eventList.filter(i);
final short flags = eventList.flags(i);
final int fd = eventList.fd(i);
if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
// EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
// we later attempt to delete the filters from kqueue.
assert filter != Native.EVFILT_USER ||
(filter == Native.EVFILT_USER && eventList.fd(i) == KQUEUE_WAKE_UP_IDENT);
(filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
continue;
}

AbstractKQueueChannel channel = eventList.channel(i);
AbstractKQueueChannel channel = channels.get(fd);
if (channel == null) {
// This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
// We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
Expand Down Expand Up @@ -327,12 +319,6 @@ protected void cleanup() {
}
} finally {
// Cleanup all native memory!

// The JNI channel pointers should already be deleted because we should wait on kevent before this method,
// but lets just be sure we cleanup native memory.
deleteJniChannelPointers();
jniChannelPointers.free();

changeList.free();
eventList.free();
}
Expand Down

0 comments on commit 5ecb34e

Please sign in to comment.