Skip to content

Commit

Permalink
Adding optional io_uring support, ergonomics and TCP_NO_DELAY set by …
Browse files Browse the repository at this point in the history
…default
  • Loading branch information
franz1981 committed May 16, 2022
1 parent 706211c commit 0d357d0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<netty.version>4.1.76.Final</netty.version>
<netty.version>4.1.77.Final</netty.version>
<netty.io_uring.version>0.0.14.Final</netty.io_uring.version>
<netty.epoll.classifier>linux-x86_64</netty.epoll.classifier>
<netty.io_uring.classifier>linux-x86_64</netty.io_uring.classifier>
<netty.kqueue.classifier>osx-x86_64</netty.kqueue.classifier>
<log4j.version>2.14.1</log4j.version>
<kafka-clients.version>3.1.0</kafka-clients.version>
Expand Down Expand Up @@ -124,6 +126,12 @@
<artifactId>netty-transport-classes-kqueue</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty.io_uring.version}</version>
<classifier>${netty.io_uring.classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
Expand Down
35 changes: 28 additions & 7 deletions src/main/java/io/strimzi/kproxy/KafkaProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import java.util.List;

import io.netty.channel.epoll.EpollChannelOption;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -48,6 +52,7 @@ public final class KafkaProxy {
private final int brokerPort;
private final boolean logNetwork;
private final boolean logFrames;
private final boolean useIoUring;
private final List<Interceptor> interceptors;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
Expand All @@ -58,6 +63,7 @@ public static void main(String[] args) throws Exception {
Integer.parseInt(System.getProperty("localPort", "9192")),
System.getProperty("remoteHost", "localhost"),
Integer.parseInt(System.getProperty("remotePort", "9092")),
Boolean.getBoolean("useIoUring"),
false,
false,
List.of(
Expand All @@ -84,13 +90,15 @@ public KafkaProxy(
int brokerPort,
boolean logNetwork,
boolean logFrames,
boolean useIoUring,
List<Interceptor> interceptors) {
this.proxyHost = proxyHost;
this.proxyPort = proxyPort;
this.brokerHost = brokerHost;
this.brokerPort = brokerPort;
this.logNetwork = logNetwork;
this.logFrames = logFrames;
this.useIoUring = useIoUring;
this.interceptors = interceptors;
}

Expand Down Expand Up @@ -118,6 +126,10 @@ public String brokerAddress() {
return brokerHost() + ":" + brokerPort();
}

public boolean useIoUring() {
return useIoUring;
}

/**
* Starts this proxy.
* @return This proxy.
Expand All @@ -131,29 +143,38 @@ public KafkaProxy startup() throws InterruptedException {

InterceptorProviderFactory interceptorProviderFactory = new InterceptorProviderFactory(interceptors);
KafkaProxyInitializer initializer = new KafkaProxyInitializer(brokerHost, brokerPort, interceptorProviderFactory, logNetwork, logFrames);

final int availableCores = Runtime.getRuntime().availableProcessors();
// Configure the bootstrap.
final Class<? extends ServerChannel> channelClass;
if (Epoll.isAvailable()) {
if (useIoUring) {
if (!IOUring.isAvailable()) {
throw new IllegalStateException("io_uring not available due to: " + IOUring.unavailabilityCause());
}
bossGroup = new IOUringEventLoopGroup(1);
workerGroup = new IOUringEventLoopGroup(availableCores);
channelClass = IOUringServerSocketChannel.class;
}
else if (Epoll.isAvailable()) {
bossGroup = new EpollEventLoopGroup(1);
workerGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup(availableCores);
channelClass = EpollServerSocketChannel.class;
}
else if (KQueue.isAvailable()) {
bossGroup = new KQueueEventLoopGroup(1);
workerGroup = new KQueueEventLoopGroup();
workerGroup = new KQueueEventLoopGroup(availableCores);
channelClass = KQueueServerSocketChannel.class;
}
else {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(availableCores);
channelClass = NioServerSocketChannel.class;
}
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(channelClass)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(initializer)
.childOption(ChannelOption.AUTO_READ, false);
.childOption(ChannelOption.AUTO_READ, false)
.childOption(ChannelOption.TCP_NODELAY, true);

ChannelFuture bindFuture;
if (proxyHost != null) {
bindFuture = serverBootstrap.bind(proxyHost, proxyPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public void channelActive(ChannelHandlerContext ctx) {
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(backendHandler)
.option(ChannelOption.AUTO_READ, true);
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.TCP_NODELAY, true);

LOGGER.trace("Connecting to outbound {}:{}", remoteHost, remotePort);
ChannelFuture connectFuture = b.connect(remoteHost, remotePort);
Expand Down

0 comments on commit 0d357d0

Please sign in to comment.