Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding optional io_uring support, ergonomics and TCP_NO_DELAY set by default #18

Merged
merged 1 commit into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<netty.version>4.1.76.Final</netty.version>
<netty.version>4.1.77.Final</netty.version>
<netty.io_uring.version>0.0.14.Final</netty.io_uring.version>
<netty.epoll.classifier>linux-x86_64</netty.epoll.classifier>
<netty.io_uring.classifier>linux-x86_64</netty.io_uring.classifier>
<netty.kqueue.classifier>osx-x86_64</netty.kqueue.classifier>
<log4j.version>2.14.1</log4j.version>
<kafka-clients.version>3.1.0</kafka-clients.version>
Expand Down Expand Up @@ -124,6 +126,12 @@
<artifactId>netty-transport-classes-kqueue</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty.io_uring.version}</version>
<classifier>${netty.io_uring.classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
Expand Down
35 changes: 28 additions & 7 deletions src/main/java/io/strimzi/kproxy/KafkaProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import java.util.List;

import io.netty.channel.epoll.EpollChannelOption;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -48,6 +52,7 @@ public final class KafkaProxy {
private final int brokerPort;
private final boolean logNetwork;
private final boolean logFrames;
private final boolean useIoUring;
private final List<Interceptor> interceptors;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
Expand All @@ -58,6 +63,7 @@ public static void main(String[] args) throws Exception {
Integer.parseInt(System.getProperty("localPort", "9192")),
System.getProperty("remoteHost", "localhost"),
Integer.parseInt(System.getProperty("remotePort", "9092")),
Boolean.getBoolean("useIoUring"),
false,
false,
List.of(
Expand All @@ -84,13 +90,15 @@ public KafkaProxy(
int brokerPort,
boolean logNetwork,
boolean logFrames,
boolean useIoUring,
List<Interceptor> interceptors) {
this.proxyHost = proxyHost;
this.proxyPort = proxyPort;
this.brokerHost = brokerHost;
this.brokerPort = brokerPort;
this.logNetwork = logNetwork;
this.logFrames = logFrames;
this.useIoUring = useIoUring;
this.interceptors = interceptors;
}

Expand Down Expand Up @@ -118,6 +126,10 @@ public String brokerAddress() {
return brokerHost() + ":" + brokerPort();
}

public boolean useIoUring() {
return useIoUring;
}

/**
* Starts this proxy.
* @return This proxy.
Expand All @@ -131,29 +143,38 @@ public KafkaProxy startup() throws InterruptedException {

InterceptorProviderFactory interceptorProviderFactory = new InterceptorProviderFactory(interceptors);
KafkaProxyInitializer initializer = new KafkaProxyInitializer(brokerHost, brokerPort, interceptorProviderFactory, logNetwork, logFrames);

final int availableCores = Runtime.getRuntime().availableProcessors();
// Configure the bootstrap.
final Class<? extends ServerChannel> channelClass;
if (Epoll.isAvailable()) {
if (useIoUring) {
if (!IOUring.isAvailable()) {
throw new IllegalStateException("io_uring not available due to: " + IOUring.unavailabilityCause());
}
bossGroup = new IOUringEventLoopGroup(1);
workerGroup = new IOUringEventLoopGroup(availableCores);
channelClass = IOUringServerSocketChannel.class;
}
else if (Epoll.isAvailable()) {
bossGroup = new EpollEventLoopGroup(1);
workerGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup(availableCores);
channelClass = EpollServerSocketChannel.class;
}
else if (KQueue.isAvailable()) {
bossGroup = new KQueueEventLoopGroup(1);
workerGroup = new KQueueEventLoopGroup();
workerGroup = new KQueueEventLoopGroup(availableCores);
channelClass = KQueueServerSocketChannel.class;
}
else {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(availableCores);
channelClass = NioServerSocketChannel.class;
}
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(channelClass)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(initializer)
.childOption(ChannelOption.AUTO_READ, false);
.childOption(ChannelOption.AUTO_READ, false)
.childOption(ChannelOption.TCP_NODELAY, true);

ChannelFuture bindFuture;
if (proxyHost != null) {
bindFuture = serverBootstrap.bind(proxyHost, proxyPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public void channelActive(ChannelHandlerContext ctx) {
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(backendHandler)
.option(ChannelOption.AUTO_READ, true);
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.TCP_NODELAY, true);

LOGGER.trace("Connecting to outbound {}:{}", remoteHost, remotePort);
ChannelFuture connectFuture = b.connect(remoteHost, remotePort);
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/strimzi/kproxy/ProxyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void shouldModifyProduceMessage() throws Exception {
private KafkaProxy startProxy(String proxyHost, int proxyPort, String brokerList, List<Interceptor> interceptors) throws InterruptedException {
String[] hostPort = brokerList.split(",")[0].split(":");

KafkaProxy kafkaProxy = new KafkaProxy(proxyHost, proxyPort, hostPort[0], parseInt(hostPort[1]), true, true, interceptors);
KafkaProxy kafkaProxy = new KafkaProxy(proxyHost, proxyPort, hostPort[0], parseInt(hostPort[1]), true, true, false, interceptors);
kafkaProxy.startup();
return kafkaProxy;
}
Expand Down