Skip to content

Commit

Permalink
Directly use memory addresses for gathering writes to reduce gc press…
Browse files Browse the repository at this point in the history
…ure. Part of [#2239]

This also does factor out some logic of ChannelOutboundBuffer. Mainly we not need nioBuffers() for many
transports and also not need to copy from heap to direct buffer. So this functionality was moved to
NioSocketChannelOutboundBuffer. Also introduce a EpollChannelOutboundBuffer which makes use of
memory addresses for all the writes to reduce GC pressure
  • Loading branch information
Norman Maurer committed Feb 21, 2014
1 parent bea8107 commit 60b830b
Show file tree
Hide file tree
Showing 20 changed files with 1,020 additions and 392 deletions.
91 changes: 71 additions & 20 deletions transport-native-epoll/src/main/c/io_netty_channel_epoll_Native.c
Expand Up @@ -43,6 +43,9 @@ jfieldID fileChannelFieldId = NULL;
jfieldID transferedFieldId = NULL;
jfieldID fdFieldId = NULL;
jfieldID fileDescriptorFieldId = NULL;
jfieldID readerIndexFieldId = NULL;
jfieldID writerIndexFieldId = NULL;
jfieldID memoryAddressFieldId = NULL;
jmethodID inetSocketAddrMethodId = NULL;
jclass runtimeExceptionClass = NULL;
jclass ioExceptionClass = NULL;
Expand Down Expand Up @@ -322,6 +325,27 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
return JNI_ERR;
}
socketType = socket_type();

jclass addressEntryClass = (*env)->FindClass(env, "io/netty/channel/epoll/EpollChannelOutboundBuffer$AddressEntry");
if (addressEntryClass == NULL) {
// pending exception...
return JNI_ERR;
}
readerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "readerIndex", "I");
if (readerIndexFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
writerIndexFieldId = (*env)->GetFieldID(env, addressEntryClass, "writerIndex", "I");
if (writerIndexFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
memoryAddressFieldId = (*env)->GetFieldID(env, addressEntryClass, "memoryAddress", "J");
if (memoryAddressFieldId == NULL) {
// pending exception...
return JNI_ERR;
}
return JNI_VERSION_1_6;
}
}
Expand Down Expand Up @@ -510,6 +534,29 @@ void incrementPosition(JNIEnv * env, jobject bufObj, int written) {
}
}

jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) {
ssize_t res;
int err;
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));

if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// network stack is saturated we will try again later
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
return -1;
}
return (jlong) res;
}

JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length) {
struct iovec iov[length];
int i;
Expand Down Expand Up @@ -541,32 +588,15 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
iov[iovidx].iov_len = (size_t) (limit - pos);
iovidx++;
}

ssize_t res;
int err;
do {
res = writev(fd, iov, length);
// keep on writing if it was interrupted
} while(res == -1 && ((err = errno) == EINTR));

if (res < 0) {
if (err == EAGAIN || err == EWOULDBLOCK) {
// network stack is saturated we will try again later
return 0;
}
if (err == EBADF) {
throwClosedChannelException(env);
return -1;
}
throwIOException(env, exceptionMessage("Error while writev(...): ", err));
return -1;
jlong res = writev0(env, clazz, fd, iov, length);
if (res <= 0) {
return res;
}

// update the position of the written buffers
int written = res;
int a;
for (a = 0; a < length; a++) {
int pos;
int len = iov[a].iov_len;
jobject bufObj = (*env)->GetObjectArrayElement(env, buffers, a + offset);
if (len >= written) {
Expand All @@ -580,6 +610,27 @@ JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writev(JNIEnv * env,
return res;
}

JNIEXPORT jlong JNICALL Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length) {
struct iovec iov[length];
int i;
int iovidx = 0;
for (i = offset; i < length; i++) {
jobject addressEntry = (*env)->GetObjectArrayElement(env, addresses, i);
jint readerIndex = (*env)->GetIntField(env, addressEntry, readerIndexFieldId);
jint writerIndex = (*env)->GetIntField(env, addressEntry, writerIndexFieldId);
void* memoryAddress = (void*) (*env)->GetLongField(env, addressEntry, memoryAddressFieldId);

iov[iovidx].iov_base = memoryAddress + readerIndex;
iov[iovidx].iov_len = (size_t) (writerIndex - readerIndex);
iovidx++;
}

jlong res = writev0(env, clazz, fd, iov, length);
if (res <= 0) {
return res;
}
}

jint read0(JNIEnv * env, jclass clazz, jint fd, void *buffer, jint pos, jint limit) {
ssize_t res;
int err;
Expand Down
Expand Up @@ -32,6 +32,8 @@ void Java_io_netty_channel_epoll_Native_epollCtlDel(JNIEnv * env, jclass clazz,
jint Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
jlong Java_io_netty_channel_epoll_Native_writev(JNIEnv * env, jclass clazz, jint fd, jobjectArray buffers, jint offset, jint length);
jlong Java_io_netty_channel_epoll_Native_writevAddresses(JNIEnv * env, jclass clazz, jint fd, jobjectArray addresses, jint offset, jint length);

jint Java_io_netty_channel_epoll_Native_read(JNIEnv * env, jclass clazz, jint fd, jobject jbuffer, jint pos, jint limit);
jint Java_io_netty_channel_epoll_Native_readAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit);
void JNICALL Java_io_netty_channel_epoll_Native_close(JNIEnv * env, jclass clazz, jint fd);
Expand Down
@@ -0,0 +1,202 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.Recycler;

import java.nio.ByteBuffer;
import java.util.Arrays;

/**
* Special {@link ChannelOutboundBuffer} implementation which allows to obtain an array of {@link AddressEntry}
* and so doing gathering writes without the need to create a {@link ByteBuffer} internally. This reduce
* GC pressure a lot.
*/
final class EpollChannelOutboundBuffer extends ChannelOutboundBuffer {
private AddressEntry[] addresses;
private int addressCount;
private long addressSize;
private static final Recycler<EpollChannelOutboundBuffer> RECYCLER = new Recycler<EpollChannelOutboundBuffer>() {
@Override
protected EpollChannelOutboundBuffer newObject(Handle<EpollChannelOutboundBuffer> handle) {
return new EpollChannelOutboundBuffer(handle);
}
};

/**
* Get a new instance of this {@link EpollChannelOutboundBuffer} and attach it the given {@link EpollSocketChannel}
*/
static EpollChannelOutboundBuffer newInstance(EpollSocketChannel channel) {
EpollChannelOutboundBuffer buffer = RECYCLER.get();
buffer.channel = channel;
return buffer;
}

private EpollChannelOutboundBuffer(Recycler.Handle<? extends ChannelOutboundBuffer> handle) {
super(handle);
addresses = new AddressEntry[INITIAL_CAPACITY];
}

/**
* Check if the message is a {@link ByteBuf} and if so if it has a memoryAddress. If not it will convert this
* {@link ByteBuf} to be able to operate on the memoryAddress directly for maximal performance.
*/
@Override
protected Object beforeAdd(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress()) {
ByteBuf direct = copyToDirectByteBuf(buf);
return direct;
}
}
return msg;
}

/**
* Returns an array of {@link AddressEntry}'s if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #addressCount()} and
* {@link #addressSize()} ()} will return the number of {@link AddressEntry}'s in the returned array and the total
* number of readable bytes of the NIO buffers respectively.
* <p>
* Note that the returned array is reused and thus should not escape
* {@link io.netty.channel.AbstractChannel#doWrite(ChannelOutboundBuffer)}.
* Refer to {@link EpollSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
* </p>
*/
AddressEntry[] memoryAddresses() {
long addressSize = 0;
int addressCount = 0;
final Entry[] buffer = entries();
final int mask = buffer.length - 1;
AddressEntry[] addresses = this.addresses;
Object m;
int unflushed = unflushed();
int flushed = flushed();
while (flushed != unflushed && (m = buffer[flushed].msg()) != null) {
if (!(m instanceof ByteBuf)) {
this.addressCount = 0;
this.addressSize = 0;
return null;
}

AddressEntry entry = (AddressEntry) buffer[flushed];

// Check if the entry was cancelled. if so we just skip it.
if (!entry.isCancelled()) {
ByteBuf buf = (ByteBuf) m;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

if (readableBytes > 0) {
addressSize += readableBytes;
// See if there is enough space to at least store one more entry.
int neededSpace = addressCount + 1;
if (neededSpace > addresses.length) {
this.addresses = addresses =
expandAddressesArray(addresses, neededSpace, addressCount);
}
entry.memoryAddress = buf.memoryAddress();
entry.readerIndex = buf.readerIndex();
entry.writerIndex = buf.writerIndex();

addresses[addressCount ++] = entry;
}
}

flushed = flushed + 1 & mask;
}
this.addressCount = addressCount;
this.addressSize = addressSize;

return addresses;
}

private static AddressEntry[] expandAddressesArray(AddressEntry[] array, int neededSpace, int size) {
int newCapacity = array.length;
do {
// double capacity until it is big enough
// See https://github.com/netty/netty/issues/1890
newCapacity <<= 1;

if (newCapacity < 0) {
throw new IllegalStateException();
}

} while (neededSpace > newCapacity);

AddressEntry[] newArray = new AddressEntry[newCapacity];
System.arraycopy(array, 0, newArray, 0, size);

return newArray;
}

/**
* Return the number of {@link AddressEntry}'s which can be written.
*/
int addressCount() {
return addressCount;
}

/**
* Return the number of bytes that can be written via gathering writes.
*/
long addressSize() {
return addressSize;
}

@Override
public void recycle() {
if (addresses.length > INITIAL_CAPACITY) {
addresses = new AddressEntry[INITIAL_CAPACITY];
} else {
// null out the nio buffers array so the can be GC'ed
// https://github.com/netty/netty/issues/1763
Arrays.fill(addresses, null);
}
super.recycle();
}

@Override
protected AddressEntry newEntry() {
return new AddressEntry();
}

static final class AddressEntry extends Entry {
// These fields will be accessed via JNI directly so be carefully when touch them!
long memoryAddress;
int readerIndex;
int writerIndex;

@Override
public void clear() {
memoryAddress = -1;
readerIndex = 0;
writerIndex = 0;
super.clear();
}

@Override
public int cancel() {
memoryAddress = -1;
readerIndex = 0;
writerIndex = 0;
return super.cancel();
}
}
}

0 comments on commit 60b830b

Please sign in to comment.