Skip to content

Commit

Permalink
Adapt to the changes in Netty 5 API (#2342)
Browse files Browse the repository at this point in the history
- `@Sharable` is removed netty/netty#12522
- `ByteToMessageDecoderForBuffer` is renamed to `ByteToMessageDecoder` netty/netty#12512
- `ByteBuf` adapters are moved to Netty Contrib netty/netty#12512
- Enable `ObservabilitySmokeTest` netty/netty#12518
- `ByteBufAllocatorMetrics` functionality needs migration #2341
- Adapt CI build

Related to #1873
  • Loading branch information
violetagg committed Jun 29, 2022
1 parent 2a9fb12 commit 0918511
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 182 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/check_transport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,22 @@ jobs:
fetch-depth: 0 #needed by spotless
- uses: actions/checkout@v3
with:
repository: netty-contrib/codec-haproxy
# TODO temporary https://github.com/netty-contrib/codec-haproxy/pull/7
repository: violetagg/codec-haproxy
path: codec-haproxy
ref: remove-forBuffer
- uses: actions/checkout@v3
with:
repository: netty-contrib/codec-extras
# TODO temporary https://github.com/netty-contrib/codec-extras/pull/8
repository: violetagg/codec-extras
path: codec-extras
ref: port-to-buffer
- uses: actions/checkout@v3
with:
repository: netty-contrib/socks-proxy
# TODO temporary https://github.com/netty-contrib/socks-proxy/pull/9
repository: violetagg/socks-proxy
path: socks-proxy
ref: remove-forBuffer
- uses: gradle/wrapper-validation-action@v1
- name: Set up JDK 17
uses: actions/setup-java@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.adaptor.ByteBufAdaptor;
import io.netty5.channel.Channel;
import io.netty5.channel.DefaultFileRegion;
import io.netty5.handler.stream.ChunkedNioFile;
Expand Down Expand Up @@ -80,7 +79,7 @@ default Mono<Void> neverComplete() {
* sequences.</p>
* <p>Note: Nesting any send* method is not supported.</p>
* <p>Note: If you need to transform from {@link io.netty.buffer.ByteBuf} to {@link Buffer}
* you can use {@link ByteBufAdaptor#extractOrCopy(BufferAllocator, io.netty.buffer.ByteBuf)}</p>
* you can use {@link io.netty5.buffer.api.adaptor.ByteBufAdaptor#extractOrCopy(BufferAllocator, io.netty.buffer.ByteBuf)}</p>
*
* @param dataStream the dataStream publishing OUT items to write on this channel
*
Expand All @@ -97,7 +96,7 @@ default NettyOutbound send(Publisher<? extends Buffer> dataStream) {
* sequences.</p>
* <p>Note: Nesting any send* method is not supported.</p>
* <p>Note: If you need to transform from {@link io.netty.buffer.ByteBuf} to {@link Buffer}
* you can use {@link ByteBufAdaptor#extractOrCopy(BufferAllocator, io.netty.buffer.ByteBuf)}</p>
* you can use {@link io.netty5.buffer.api.adaptor.ByteBufAdaptor#extractOrCopy(BufferAllocator, io.netty.buffer.ByteBuf)}</p>
*
* @param dataStream the dataStream publishing OUT items to write on this channel
* @param predicate that returns true if explicit flush operation is needed after that buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,6 @@ public String toString() {
* @author Stephane Maldini
* @author Simon Baslé
*/
@ChannelHandler.Sharable
static final class ExtractorHandler extends ChannelHandlerAdapter {


Expand All @@ -861,6 +860,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
extractor.accept(ctx, msg);
}

@Override
public boolean isSharable() {
return true;
}
}

static final class ChannelDisposer extends BaseSubscriber<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import java.util.Objects;
import java.util.function.Supplier;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelFactory;
import io.netty5.channel.ChannelInitializer;
Expand Down Expand Up @@ -399,7 +396,6 @@ static final class TransportChannelInitializer extends ChannelInitializer<Channe
}

@Override
@SuppressWarnings("removal")
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();

Expand All @@ -408,13 +404,13 @@ protected void initChannel(Channel channel) {

if (Metrics.isInstrumentationAvailable()) {
try {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
}
//ByteBufAllocator alloc = channel.alloc();
//if (alloc instanceof PooledByteBufAllocator) {
// ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
//}
//else if (alloc instanceof UnpooledByteBufAllocator) {
// ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
//}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.executor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.CombinedChannelDuplexHandler;
import io.netty5.handler.codec.ByteToMessageCodec;
import io.netty5.handler.codec.ByteToMessageDecoderForBuffer;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.handler.codec.http.EmptyLastHttpContent;
import io.netty5.handler.codec.http.FullHttpMessage;
import io.netty5.handler.codec.http.HttpContent;
Expand Down Expand Up @@ -276,7 +276,7 @@ public HttpOperations<INBOUND, OUTBOUND> addHandlerFirst(String name, ChannelHan

static void autoAddHttpExtractor(Connection c, String name, ChannelHandler handler) {

if (handler instanceof ByteToMessageDecoderForBuffer
if (handler instanceof ByteToMessageDecoder
|| handler instanceof ByteToMessageCodec
|| handler instanceof CombinedChannelDuplexHandler) {
String extractorName = name + "$extractor";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package reactor.netty.http.client;

import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.DefaultHttpContent;
Expand All @@ -31,7 +30,6 @@
* @author Violeta Georgieva
* @since 1.0.0
*/
@ChannelHandler.Sharable
final class Http2StreamBridgeClientHandler extends ChannelHandlerAdapter {

@Override
Expand All @@ -48,4 +46,9 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
return ctx.write(msg);
}
}

@Override
public boolean isSharable() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.ByteToMessageDecoderForBuffer;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.handler.codec.ProtocolDetectionResult;
import io.netty.contrib.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.contrib.handler.codec.haproxy.HAProxyProtocolVersion;
Expand All @@ -31,7 +31,7 @@
*
* @author aftersss
*/
final class HAProxyMessageDetector extends ByteToMessageDecoderForBuffer {
final class HAProxyMessageDetector extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,6 @@ public void handlerAdded(ChannelHandlerContext ctx) {
}
}

@ChannelHandler.Sharable
static final class H2CleartextReadContextHandler extends ChannelHandlerAdapter {
static final H2CleartextReadContextHandler INSTANCE = new H2CleartextReadContextHandler();

Expand All @@ -817,6 +816,11 @@ public void channelRegistered(ChannelHandlerContext ctx) {
ctx.fireChannelRegistered();
ctx.pipeline().remove(this);
}

@Override
public boolean isSharable() {
return true;
}
}

static final class H2Codec extends ChannelInitializer<Channel> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.netty5.buffer.api.Buffer;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelPipeline;
import io.netty5.handler.codec.ByteToMessageDecoderForBuffer;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.handler.ssl.SslHandler;
import reactor.netty.NettyPipeline;
import reactor.netty.tcp.SslProvider;
Expand All @@ -35,7 +35,7 @@
* @author James Chen
* @since 1.0.5
*/
final class NonSslRedirectDetector extends ByteToMessageDecoderForBuffer {
final class NonSslRedirectDetector extends ByteToMessageDecoder {

private static final int SSL_RECORD_HEADER_LENGTH = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
Expand All @@ -54,7 +53,6 @@
import static reactor.netty.Metrics.OBSERVATION_REGISTRY;

@SuppressWarnings("rawtypes")
@Disabled
class ObservabilitySmokeTest extends SampleTestRunner {
static byte[] content;
static DisposableServer disposableServer;
Expand Down

This file was deleted.

0 comments on commit 0918511

Please sign in to comment.