diff --git a/build.gradle b/build.gradle index 036b06744..7be8fa2b9 100644 --- a/build.gradle +++ b/build.gradle @@ -89,6 +89,8 @@ subprojects { test { useJUnitPlatform() + + systemProperty "io.netty.leakDetection.level", "ADVANCED" } } diff --git a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java index 3e446026a..559a1ff59 100644 --- a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java @@ -19,7 +19,6 @@ import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.util.AbstractReferenceCounted; import io.rsocket.Frame.Setup; import io.rsocket.frame.SetupFrameFlyweight; @@ -30,38 +29,9 @@ */ public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload { - public static final int NO_FLAGS = 0; - public static final int HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE; - - public static ConnectionSetupPayload create(String metadataMimeType, String dataMimeType) { - return new DefaultConnectionSetupPayload( - metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, NO_FLAGS); - } - - public static ConnectionSetupPayload create( - String metadataMimeType, String dataMimeType, Payload payload) { - return new DefaultConnectionSetupPayload( - metadataMimeType, - dataMimeType, - payload.sliceData(), - payload.sliceMetadata(), - payload.hasMetadata() ? FLAGS_M : 0); - } - - public static ConnectionSetupPayload create( - String metadataMimeType, String dataMimeType, int flags) { - return new DefaultConnectionSetupPayload( - metadataMimeType, dataMimeType, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER, flags); - } - public static ConnectionSetupPayload create(final Frame setupFrame) { Frame.ensureFrameType(FrameType.SETUP, setupFrame); - return new DefaultConnectionSetupPayload( - Setup.metadataMimeType(setupFrame), - Setup.dataMimeType(setupFrame), - setupFrame.sliceData(), - setupFrame.sliceMetadata(), - Setup.getFlags(setupFrame)); + return new DefaultConnectionSetupPayload(setupFrame); } public abstract String metadataMimeType(); @@ -71,7 +41,7 @@ public static ConnectionSetupPayload create(final Frame setupFrame) { public abstract int getFlags(); public boolean willClientHonorLease() { - return Frame.isFlagSet(getFlags(), HONOR_LEASE); + return Frame.isFlagSet(getFlags(), SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE); } @Override @@ -97,68 +67,52 @@ public ConnectionSetupPayload retain(int increment) { private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload { - private final String metadataMimeType; - private final String dataMimeType; - private final ByteBuf data; - private final ByteBuf metadata; - private final int flags; - - public DefaultConnectionSetupPayload( - String metadataMimeType, String dataMimeType, ByteBuf data, ByteBuf metadata, int flags) { - this.metadataMimeType = metadataMimeType; - this.dataMimeType = dataMimeType; - this.data = data; - this.metadata = metadata; - this.flags = flags; - - if (!hasMetadata() && metadata.readableBytes() > 0) { - throw new IllegalArgumentException("metadata flag incorrect"); - } + private final Frame setupFrame; + + public DefaultConnectionSetupPayload(final Frame setupFrame) { + this.setupFrame = setupFrame; } @Override public String metadataMimeType() { - return metadataMimeType; + return Setup.metadataMimeType(setupFrame); } @Override public String dataMimeType() { - return dataMimeType; + return Setup.dataMimeType(setupFrame); } @Override public ByteBuf sliceData() { - return data; + return setupFrame.sliceData(); } @Override public ByteBuf sliceMetadata() { - return metadata; + return setupFrame.sliceMetadata(); } @Override public int getFlags() { - return flags; + return Setup.getFlags(setupFrame); } @Override public ConnectionSetupPayload touch() { - data.touch(); - metadata.touch(); + setupFrame.touch(); return this; } @Override public ConnectionSetupPayload touch(Object hint) { - data.touch(hint); - metadata.touch(hint); + setupFrame.touch(hint); return this; } @Override protected void deallocate() { - data.release(); - metadata.release(); + setupFrame.release(); } } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index f76121c96..f4a4ebb46 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -27,9 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; import javax.annotation.Nullable; - import org.jctools.maps.NonBlockingHashMapLong; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -185,7 +183,7 @@ public Flux requestStream(Payload payload) { @Override public Flux requestChannel(Publisher payloads) { - return handleChannel(Flux.from(payloads), FrameType.REQUEST_CHANNEL); + return handleChannel(Flux.from(payloads)); } @Override @@ -289,108 +287,88 @@ private Mono handleRequestResponse(final Payload payload) { })); } - private Flux handleChannel(Flux request, FrameType requestType) { + private Flux handleChannel(Flux request) { return started.thenMany( Flux.defer( - new Supplier>() { + () -> { final UnicastProcessor receiver = UnicastProcessor.create(); final int streamId = streamIdSupplier.nextStreamId(); - volatile @Nullable MonoProcessor subscribedRequests; - boolean firstRequest = true; - - boolean isValidToSendFrame() { - return contains(streamId) && !receiver.isDisposed(); - } - - void sendOneFrame(Frame frame) { - if (isValidToSendFrame()) { - sendProcessor.onNext(frame); - } - } - - @Override - public Flux get() { - return receiver - .doOnRequest( - l -> { - boolean _firstRequest = false; - synchronized (RSocketClient.this) { - if (firstRequest) { - _firstRequest = true; - firstRequest = false; - } - } + final AtomicBoolean firstRequest = new AtomicBoolean(true); - if (_firstRequest) { - AtomicBoolean firstPayload = new AtomicBoolean(true); - Flux requestFrames = - request - .transform( - f -> { - LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f); - // Need to set this to one for first the frame - wrapped.increaseRequestLimit(1); - senders.put(streamId, wrapped); - receivers.put(streamId, receiver); - - return wrapped; - }) - .map( - new Function() { - - @Override - public Frame apply(Payload payload) { - final Frame requestFrame; - if (firstPayload.compareAndSet(true, false)) { - requestFrame = - Frame.Request.from( - streamId, requestType, payload, l); - } else { - requestFrame = - Frame.PayloadFrame.from( - streamId, FrameType.NEXT, payload); - } - payload.release(); - return requestFrame; - } - }) - .doOnComplete( - () -> { - if (FrameType.REQUEST_CHANNEL == requestType) { - sendOneFrame( - Frame.PayloadFrame.from( - streamId, FrameType.COMPLETE)); - if (firstPayload.get()) { - receiver.onComplete(); - } - } - }); - - requestFrames.subscribe( - sendProcessor::onNext, - t -> { - errorConsumer.accept(t); - receiver.dispose(); - }); - } else { - sendOneFrame(Frame.RequestN.from(streamId, l)); - } - }) - .doOnError(t -> sendOneFrame(Frame.Error.from(streamId, t))) - .doOnCancel( - () -> { - sendOneFrame(Frame.Cancel.from(streamId)); - if (subscribedRequests != null) { - subscribedRequests.cancel(); + return receiver + .doOnRequest( + n -> { + if (firstRequest.compareAndSet(true, false)) { + final AtomicBoolean firstPayload = new AtomicBoolean(true); + final Flux requestFrames = + request + .transform( + f -> { + LimitableRequestPublisher wrapped = + LimitableRequestPublisher.wrap(f); + // Need to set this to one for first the frame + wrapped.increaseRequestLimit(1); + senders.put(streamId, wrapped); + receivers.put(streamId, receiver); + + return wrapped; + }) + .map( + payload -> { + final Frame requestFrame; + if (firstPayload.compareAndSet(true, false)) { + requestFrame = + Frame.Request.from( + streamId, FrameType.REQUEST_CHANNEL, payload, n); + } else { + requestFrame = + Frame.PayloadFrame.from( + streamId, FrameType.NEXT, payload); + } + payload.release(); + return requestFrame; + }) + .doOnComplete( + () -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext( + Frame.PayloadFrame.from( + streamId, FrameType.COMPLETE)); + } + if (firstPayload.get()) { + receiver.onComplete(); + } + }); + + requestFrames.subscribe( + sendProcessor::onNext, + t -> { + errorConsumer.accept(t); + receiver.dispose(); + }); + } else { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.RequestN.from(streamId, n)); } - }) - .doFinally( - s -> { - receivers.remove(streamId); - senders.remove(streamId); - }); - } + } + }) + .doOnError( + t -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.Error.from(streamId, t)); + } + }) + .doOnCancel( + () -> { + if (contains(streamId) && !receiver.isDisposed()) { + sendProcessor.onNext(Frame.Cancel.from(streamId)); + } + }) + .doFinally( + s -> { + receivers.remove(streamId); + senders.remove(streamId); + }); })); } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 5458b3ebb..545dd863f 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -234,15 +234,14 @@ public Mono start() { RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); - RSocketServer rSocketServer = new RSocketServer( - multiplexer.asServerConnection(), - wrappedRSocketServer, - frameDecoder, - errorConsumer); - - return connection - .sendOne(setupFrame) - .thenReturn(wrappedRSocketClient); + RSocketServer rSocketServer = + new RSocketServer( + multiplexer.asServerConnection(), + wrappedRSocketServer, + frameDecoder, + errorConsumer); + + return connection.sendOne(setupFrame).thenReturn(wrappedRSocketClient); }); } } @@ -353,15 +352,18 @@ private Mono processSetupFrame( return acceptor .get() .accept(setupPayload, wrappedRSocketClient) - .doOnNext(unwrappedServerSocket -> { - RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); - - RSocketServer rSocketServer = new RSocketServer( - multiplexer.asClientConnection(), - wrappedRSocketServer, - frameDecoder, - errorConsumer); - }) + .doOnNext( + unwrappedServerSocket -> { + RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); + + RSocketServer rSocketServer = + new RSocketServer( + multiplexer.asClientConnection(), + wrappedRSocketServer, + frameDecoder, + errorConsumer); + }) + .doFinally(signalType -> setupPayload.release()) .then(); } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index d2c5c26b0..2706ecf5e 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -28,7 +28,6 @@ import io.rsocket.internal.UnboundedProcessor; import java.util.function.Consumer; import java.util.function.Function; - import org.jctools.maps.NonBlockingHashMapLong; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -315,6 +314,7 @@ private void handleStream(int streamId, Flux response, int initialReque payload.release(); return frame; }) + .concatWith(Mono.fromCallable(() -> Frame.PayloadFrame.from(streamId, FrameType.COMPLETE))) .transform( frameFlux -> { LimitableRequestPublisher frames = LimitableRequestPublisher.wrap(frameFlux); @@ -322,7 +322,6 @@ private void handleStream(int streamId, Flux response, int initialReque frames.increaseRequestLimit(initialRequestN); return frames; }) - .concatWith(Mono.just(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE))) .doFinally(signalType -> sendingSubscriptions.remove(streamId)) .subscribe(sendProcessor::onNext, t -> handleError(streamId, t)); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java index 57cfe1205..3567584bd 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java @@ -17,8 +17,8 @@ package io.rsocket.frame; import io.netty.buffer.ByteBuf; -import io.rsocket.framing.FrameType; import io.rsocket.exceptions.*; +import io.rsocket.framing.FrameType; import java.nio.charset.StandardCharsets; public class ErrorFrameFlyweight { diff --git a/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java b/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java index 3cd56f93d..09f28e50a 100644 --- a/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java +++ b/rsocket-core/src/main/java/io/rsocket/framing/AbstractRecyclableFrame.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.util.Recycler.Handle; -import io.netty.util.ReferenceCounted; import java.util.Objects; import reactor.util.annotation.Nullable; diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java index 4f04c6437..1f0091c4e 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java @@ -16,6 +16,7 @@ package io.rsocket.internal; +import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiFunction; @@ -41,7 +42,8 @@ public void subscribe(CoreSubscriber actual) { Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer)); } - static final class SwitchTransformSubscriber implements CoreSubscriber { + static final class SwitchTransformSubscriber + implements CoreSubscriber { @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once"); @@ -77,6 +79,7 @@ public void onNext(T t) { Flux.from(result).subscribe(actual); } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, actual.currentContext())); + ReferenceCountUtil.release(t); return; } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 1fdfb5c1e..d8e1997d0 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -16,7 +16,7 @@ package io.rsocket.internal; -import io.netty.util.internal.shaded.org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue; +import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -42,31 +42,35 @@ public final class UnboundedProcessor extends FluxProcessor implements Fuseable.QueueSubscription, Fuseable { + final Queue queue; + + volatile boolean done; + Throwable error; + + volatile CoreSubscriber actual; + + volatile boolean cancelled; + + volatile int once; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "once"); + volatile int wip; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "wip"); + volatile long requested; + @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested"); - final Queue queue; - - volatile boolean done; - Throwable error; - volatile CoreSubscriber actual; - volatile boolean cancelled; - volatile int once; - volatile int wip; - volatile long requested; - volatile long processed; - public UnboundedProcessor() { - this.queue = new MpscGrowableAtomicArrayQueue<>(Queues.SMALL_BUFFER_SIZE, 1 << 24); + this.queue = Queues.unboundedMultiproducer().get(); } @Override @@ -74,6 +78,12 @@ public int getBufferSize() { return Queues.capacity(this.queue); } + @Override + public Object scanUnsafe(Attr key) { + if (Attr.BUFFERED == key) return queue.size(); + return super.scanUnsafe(key); + } + void drainRegular(Subscriber a) { int missed = 1; @@ -97,6 +107,7 @@ void drainRegular(Subscriber a) { if (empty) { break; } + a.onNext(t); e++; @@ -144,7 +155,12 @@ public void drain() { boolean checkTerminated(boolean d, boolean empty, Subscriber a, Queue q) { if (cancelled) { - q.clear(); + while (!q.isEmpty()) { + T t = q.poll(); + if (t != null) { + ReferenceCountUtil.release(t); + } + } actual = null; return true; } @@ -186,6 +202,7 @@ public Context currentContext() { public void onNext(T t) { if (done || cancelled) { Operators.onNextDropped(t, currentContext()); + ReferenceCountUtil.release(t); return; } @@ -193,9 +210,9 @@ public void onNext(T t) { Throwable ex = Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()); onError(Operators.onOperatorError(null, ex, t, currentContext())); + ReferenceCountUtil.release(t); return; } - drain(); } @@ -227,8 +244,9 @@ public void onComplete() { public void subscribe(CoreSubscriber actual) { Objects.requireNonNull(actual, "subscribe"); if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { - this.actual = actual; + actual.onSubscribe(this); + this.actual = actual; if (cancelled) { this.actual = null; } else { @@ -255,8 +273,10 @@ public void cancel() { return; } cancelled = true; + if (WIP.getAndIncrement(this) == 0) { - queue.clear(); + clear(); + actual = null; } } @@ -278,7 +298,12 @@ public boolean isEmpty() { @Override public void clear() { - queue.clear(); + while (!queue.isEmpty()) { + T t = queue.poll(); + if (t != null) { + ReferenceCountUtil.release(t); + } + } } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumeUtil.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumeUtil.java index b34c9d424..da6b3262a 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumeUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumeUtil.java @@ -17,8 +17,8 @@ package io.rsocket.resume; import io.rsocket.Frame; -import io.rsocket.framing.FrameType; import io.rsocket.frame.FrameHeaderFlyweight; +import io.rsocket.framing.FrameType; public class ResumeUtil { public static boolean isTracked(FrameType frameType) { diff --git a/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java index 9f76e075c..6afa0a00e 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java @@ -18,12 +18,10 @@ import static io.rsocket.frame.ErrorFrameFlyweight.*; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import io.rsocket.Frame; import io.rsocket.exceptions.*; import java.nio.charset.StandardCharsets; import org.junit.Test; diff --git a/rsocket-core/src/test/java/io/rsocket/frame/RequestFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/RequestFrameFlyweightTest.java index e7d071058..d8dca2fc4 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/RequestFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/RequestFrameFlyweightTest.java @@ -23,8 +23,8 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.rsocket.Frame; -import io.rsocket.framing.FrameType; import io.rsocket.Payload; +import io.rsocket.framing.FrameType; import io.rsocket.util.DefaultPayload; import java.nio.charset.StandardCharsets; import org.junit.Test; diff --git a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java index 4f852720f..0dc7d9090 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java @@ -16,6 +16,8 @@ package io.rsocket.internal; +import io.rsocket.Payload; +import io.rsocket.util.EmptyPayload; import java.util.concurrent.CountDownLatch; import org.junit.Assert; import org.junit.Test; @@ -52,10 +54,10 @@ public void testOnNextBeforeSubscribe_10_000_000() { } public void testOnNextBeforeSubscribeN(int n) { - UnboundedProcessor processor = new UnboundedProcessor<>(); + UnboundedProcessor processor = new UnboundedProcessor<>(); for (int i = 0; i < n; i++) { - processor.onNext(i); + processor.onNext(EmptyPayload.INSTANCE); } processor.onComplete(); @@ -82,12 +84,12 @@ public void testOnNextAfterSubscribe_1000() throws Exception { public void testOnNextAfterSubscribeN(int n) throws Exception { CountDownLatch latch = new CountDownLatch(n); - UnboundedProcessor processor = new UnboundedProcessor<>(); + UnboundedProcessor processor = new UnboundedProcessor<>(); processor.log().doOnNext(integer -> latch.countDown()).subscribe(); for (int i = 0; i < n; i++) { System.out.println("onNexting -> " + i); - processor.onNext(i); + processor.onNext(EmptyPayload.INSTANCE); } processor.drain(); diff --git a/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java b/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java index 5dd978c9b..f885a221d 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatNullPointerException; -import io.rsocket.util.NumberUtils; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 9487b5277..ad84c05ed 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -21,7 +21,6 @@ import io.rsocket.transport.netty.WebsocketDuplexConnection; import java.util.function.BiFunction; import java.util.function.Consumer; - import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.ipc.netty.http.server.HttpServer; @@ -34,9 +33,8 @@ public class WebsocketRouteTransport implements ServerTransport { private Consumer routesBuilder; private String path; - public WebsocketRouteTransport(HttpServer server, - Consumer routesBuilder, - String path) { + public WebsocketRouteTransport( + HttpServer server, Consumer routesBuilder, String path) { this.server = server; this.routesBuilder = routesBuilder; this.path = path; @@ -45,10 +43,11 @@ public WebsocketRouteTransport(HttpServer server, @Override public Mono start(ConnectionAcceptor acceptor) { return server - .newRouter(routes -> { - routesBuilder.accept(routes); - routes.ws(path, newHandler(acceptor)); - }) + .newRouter( + routes -> { + routesBuilder.accept(routes); + routes.ws(path, newHandler(acceptor)); + }) .map(NettyContextCloseable::new); }