Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont pool IovArray but just allocate on the fly #10627

Merged
merged 9 commits into from Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.Limits;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -209,16 +210,22 @@ protected Executor prepareToClose() {
}

private ByteBuf readBuffer;
private IovArray iovArray;

@Override
protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray();
assert iovArray == null;
int numElements = Math.min(in.size(), Limits.IOV_MAX);
ByteBuf iovArrayBuffer = alloc().directBuffer(numElements * IovArray.IOV_SIZE);
iovArray = new IovArray(iovArrayBuffer);
try {
int offset = iovecArray.count();
in.forEachFlushedMessage(iovecArray);
int offset = iovArray.count();
in.forEachFlushedMessage(iovArray);
submissionQueue().addWritev(socket.intValue(),
iovecArray.memoryAddress(offset), iovecArray.count() - offset, (short) 0);
iovArray.memoryAddress(offset), iovArray.count() - offset, (short) 0);
} catch (Exception e) {
iovArray.release();
normanmaurer marked this conversation as resolved.
Show resolved Hide resolved

// This should never happen, anyway fallback to single write.
scheduleWriteSingle(in.current());
}
Expand All @@ -227,6 +234,7 @@ protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {

@Override
protected int scheduleWriteSingle(Object msg) {
assert iovArray == null;
ByteBuf buf = (ByteBuf) msg;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
Expand All @@ -236,12 +244,13 @@ protected int scheduleWriteSingle(Object msg) {

@Override
protected int scheduleRead0() {
assert readBuffer == null;

final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(alloc());
IOUringSubmissionQueue submissionQueue = submissionQueue();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());

assert readBuffer == null;
readBuffer = byteBuf;

submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
Expand Down Expand Up @@ -321,6 +330,11 @@ private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,

@Override
boolean writeComplete0(int res, int data, int outstanding) {
IovArray iovArray = this.iovArray;
if (iovArray != null) {
this.iovArray = null;
iovArray.release();
}
if (res >= 0) {
unsafe().outboundBuffer().removeBytes(res);
} else {
Expand Down
Expand Up @@ -19,7 +19,6 @@
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
Expand Down Expand Up @@ -50,7 +49,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private final FileDescriptor eventfd;

private final IovArrays iovArrays;
// The maximum number of bytes for an InetAddress / Inet6Address
private final byte[] inet4AddressArray = new byte[4];
private final byte[] inet6AddressArray = new byte[16];
Expand All @@ -65,14 +63,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
// Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
IOUring.ensureAvailability();

// TODO: Let's hard code this to 8 IovArrays to keep the memory overhead kind of small. We may want to consider
// allow to change this in the future.
iovArrays = new IovArrays(8);
ringBuffer = Native.createRingBuffer(ringSize, iosqeAsyncThreshold, new Runnable() {
@Override
public void run() {
// Once we submitted its safe to clear the IovArrays and so be able to re-use these.
iovArrays.clear();
// NOOP
normanmaurer marked this conversation as resolved.
Show resolved Hide resolved
}
});

Expand Down Expand Up @@ -321,7 +315,6 @@ public void handle(int fd, int res, int flags, int op, short data) {
logger.warn("Failed to close the event fd.", e);
}
ringBuffer.close();
iovArrays.release();
PlatformDependent.freeMemory(eventfdReadBuf);
}

Expand All @@ -337,16 +330,6 @@ protected void wakeup(boolean inEventLoop) {
}
}

IovArray iovArray() {
IovArray iovArray = iovArrays.next();
if (iovArray == null) {
ringBuffer.ioUringSubmissionQueue().submit();
iovArray = iovArrays.next();
assert iovArray != null;
}
return iovArray;
}

/**
* {@code byte[]} that can be used as temporary storage to encode the ipv4 address
*/
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -16,6 +16,7 @@
package io.netty.channel.unix;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOutboundBuffer.MessageProcessor;
import io.netty.util.internal.PlatformDependent;

Expand Down Expand Up @@ -52,23 +53,27 @@ public final class IovArray implements MessageProcessor {
* The size of an {@code iovec} struct in bytes. This is calculated as we have 2 entries each of the size of the
* address.
*/
private static final int IOV_SIZE = 2 * ADDRESS_SIZE;
public static final int IOV_SIZE = 2 * ADDRESS_SIZE;

/**
* The needed memory to hold up to {@code IOV_MAX} iov entries, where {@code IOV_MAX} signified
* the maximum number of {@code iovec} structs that can be passed to {@code writev(...)}.
*/
private static final int CAPACITY = IOV_MAX * IOV_SIZE;
private static final int DEFAULT_CAPACITY = IOV_MAX * IOV_SIZE;

private final ByteBuffer memory;
private final ByteBuf memory;
private final long memoryAddress;
private int count;
private long size;
private long maxBytes = SSIZE_MAX;

public IovArray() {
memory = Buffer.allocateDirectWithNativeOrder(CAPACITY);
memoryAddress = Buffer.memoryAddress(memory);
this(Unpooled.wrappedBuffer(Buffer.allocateDirectWithNativeOrder(DEFAULT_CAPACITY)));
}

public IovArray(ByteBuf memory) {
this.memory = memory;
memoryAddress = memory.memoryAddress();
}

public boolean isFull() {
Expand Down Expand Up @@ -120,7 +125,9 @@ private boolean add(long addr, int len) {

// If there is at least 1 entry then we enforce the maximum bytes. We want to accept at least one entry so we
// will attempt to write some data and make progress.
if (maxBytes - len < size && count > 0) {
if ((maxBytes - len < size && count > 0) ||
// Check if we have enough space left
memory.capacity() < (count + 1) * IOV_SIZE) {
// If the size + len will overflow SSIZE_MAX we stop populate the IovArray. This is done as linux
// not allow to write more bytes then SSIZE_MAX with one writev(...) call and so will
// return 'EINVAL', which will raise an IOException.
Expand All @@ -141,17 +148,17 @@ private boolean add(long addr, int len) {
PlatformDependent.putLong(baseOffset + memoryAddress, addr);
PlatformDependent.putLong(lengthOffset + memoryAddress, len);
} else {
memory.putLong(baseOffset, addr);
memory.putLong(lengthOffset, len);
memory.setLong(baseOffset, addr);
memory.setLong(lengthOffset, len);
}
} else {
assert ADDRESS_SIZE == 4;
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.putInt(baseOffset + memoryAddress, (int) addr);
PlatformDependent.putInt(lengthOffset + memoryAddress, len);
} else {
memory.putInt(baseOffset, (int) addr);
memory.putInt(lengthOffset, len);
memory.setInt(baseOffset, (int) addr);
memory.setInt(lengthOffset, len);
}
}
return true;
Expand Down Expand Up @@ -204,7 +211,7 @@ public long memoryAddress(int offset) {
* Release the {@link IovArray}. Once release further using of it may crash the JVM!
*/
public void release() {
Buffer.free(memory);
memory.release();
}

@Override
Expand Down