diff --git a/build.gradle b/build.gradle index 6a4600a22..8f3cee000 100644 --- a/build.gradle +++ b/build.gradle @@ -17,7 +17,7 @@ plugins { id 'com.gradle.build-scan' version '1.16' - id 'com.github.sherter.google-java-format' version '0.7.1' + id 'com.github.sherter.google-java-format' version '0.7.1' apply false id 'com.jfrog.artifactory' version '4.7.3' apply false id 'com.jfrog.bintray' version '1.8.4' apply false id 'me.champeau.gradle.jmh' version '0.4.7' apply false @@ -27,6 +27,7 @@ plugins { subprojects { apply plugin: 'io.spring.dependency-management' + apply plugin: 'com.github.sherter.google-java-format' ext['reactor-bom.version'] = 'Californium-SR3' ext['logback.version'] = '1.2.3' @@ -42,6 +43,10 @@ subprojects { ext['micrometer.version'] = '1.0.6' ext['assertj.version'] = '3.11.1' + googleJavaFormat { + toolVersion = '1.6' + } + dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:${ext['reactor-bom.version']}" @@ -188,10 +193,6 @@ buildScan { description = 'RSocket: Stream Oriented Messaging Passing with Reactive Stream Semantics.' -googleJavaFormat { - toolVersion = '1.5' -} - repositories { mavenCentral() } diff --git a/rsocket-core/src/jmh/java/io/rsocket/frame/FrameTypePerf.java b/rsocket-core/src/jmh/java/io/rsocket/frame/FrameTypePerf.java index b7dce221f..efa22104f 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/frame/FrameTypePerf.java +++ b/rsocket-core/src/jmh/java/io/rsocket/frame/FrameTypePerf.java @@ -5,8 +5,8 @@ @BenchmarkMode(Mode.Throughput) @Fork( - value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"} -) + value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"} + ) @Warmup(iterations = 10) @Measurement(iterations = 10) @State(Scope.Thread) diff --git a/rsocket-core/src/jmh/java/io/rsocket/frame/PayloadFlyweightPerf.java b/rsocket-core/src/jmh/java/io/rsocket/frame/PayloadFlyweightPerf.java index 416eaa8dc..89e50a6e9 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/frame/PayloadFlyweightPerf.java +++ b/rsocket-core/src/jmh/java/io/rsocket/frame/PayloadFlyweightPerf.java @@ -53,7 +53,7 @@ public void setup(Blackhole bh) { this.bh = bh; this.frameType = FrameType.REQUEST_RESPONSE; allocator = ByteBufAllocator.DEFAULT; - + // Encode a payload and then copy it a single bytebuf payload = allocator.buffer(); ByteBuf encode = diff --git a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java index 7be954f52..9f1d7ea6b 100644 --- a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java @@ -22,106 +22,107 @@ import io.rsocket.frame.SetupFrameFlyweight; /** - * Exposed to server for determination of ResponderRSocket based on mime types and SETUP metadata/data + * Exposed to server for determination of ResponderRSocket based on mime types and SETUP + * metadata/data */ public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload { - + public static ConnectionSetupPayload create(final ByteBuf setupFrame) { return new DefaultConnectionSetupPayload(setupFrame); } - + public abstract int keepAliveInterval(); - + public abstract int keepAliveMaxLifetime(); - + public abstract String metadataMimeType(); - + public abstract String dataMimeType(); - + public abstract int getFlags(); - + public abstract boolean willClientHonorLease(); - + @Override public ConnectionSetupPayload retain() { super.retain(); return this; } - + @Override public ConnectionSetupPayload retain(int increment) { super.retain(increment); return this; } - + public abstract ConnectionSetupPayload touch(); - + public abstract ConnectionSetupPayload touch(Object hint); - + private static final class DefaultConnectionSetupPayload extends ConnectionSetupPayload { private final ByteBuf setupFrame; - + public DefaultConnectionSetupPayload(ByteBuf setupFrame) { this.setupFrame = setupFrame; } - + @Override public boolean hasMetadata() { return FrameHeaderFlyweight.hasMetadata(setupFrame); } - + @Override public int keepAliveInterval() { return SetupFrameFlyweight.keepAliveInterval(setupFrame); } - + @Override public int keepAliveMaxLifetime() { return SetupFrameFlyweight.keepAliveMaxLifetime(setupFrame); } - + @Override public String metadataMimeType() { return SetupFrameFlyweight.metadataMimeType(setupFrame); } - + @Override public String dataMimeType() { return SetupFrameFlyweight.dataMimeType(setupFrame); } - + @Override public int getFlags() { return FrameHeaderFlyweight.flags(setupFrame); } - + @Override public boolean willClientHonorLease() { return SetupFrameFlyweight.honorLease(setupFrame); } - + @Override public ConnectionSetupPayload touch() { setupFrame.touch(); return this; } - + @Override public ConnectionSetupPayload touch(Object hint) { setupFrame.touch(hint); return this; } - + @Override protected void deallocate() { setupFrame.release(); } - + @Override public ByteBuf sliceMetadata() { return SetupFrameFlyweight.metadata(setupFrame); } - + @Override public ByteBuf sliceData() { return SetupFrameFlyweight.data(setupFrame); diff --git a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java index 0e653ba3e..7739a34c0 100644 --- a/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/DuplexConnection.java @@ -16,9 +16,8 @@ package io.rsocket; -import java.nio.channels.ClosedChannelException; - import io.netty.buffer.ByteBuf; +import java.nio.channels.ClosedChannelException; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import reactor.core.publisher.Flux; @@ -28,8 +27,8 @@ public interface DuplexConnection extends Availability, Closeable { /** - * Sends the source of Frames on this connection and returns the {@code Publisher} - * representing the result of this send. + * Sends the source of Frames on this connection and returns the {@code Publisher} representing + * the result of this send. * *

Flow control

* diff --git a/rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java b/rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java index e8662516c..7eda01fbe 100644 --- a/rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java @@ -4,14 +4,13 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.frame.KeepAliveFrameFlyweight; +import java.time.Duration; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.UnicastProcessor; -import java.time.Duration; - abstract class KeepAliveHandler implements Disposable { private final KeepAlive keepAlive; private final UnicastProcessor sent = UnicastProcessor.create(); @@ -45,8 +44,12 @@ public void dispose() { public void receive(ByteBuf keepAliveFrame) { this.lastReceivedMillis = System.currentTimeMillis(); if (KeepAliveFrameFlyweight.respondFlag(keepAliveFrame)) { - doSend(KeepAliveFrameFlyweight.encode(ByteBufAllocator.DEFAULT, false, 0, - KeepAliveFrameFlyweight.data(keepAliveFrame).retain())); + doSend( + KeepAliveFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + false, + 0, + KeepAliveFrameFlyweight.data(keepAliveFrame).retain())); } } @@ -92,7 +95,8 @@ private static final class Client extends KeepAliveHandler { @Override void onIntervalTick() { doCheckTimeout(); - doSend(KeepAliveFrameFlyweight.encode(ByteBufAllocator.DEFAULT, true, 0, Unpooled.EMPTY_BUFFER)); + doSend( + KeepAliveFrameFlyweight.encode(ByteBufAllocator.DEFAULT, true, 0, Unpooled.EMPTY_BUFFER)); } } diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 7a4ac8404..27a882d01 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -26,14 +26,6 @@ import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; import io.rsocket.internal.UnicastMonoProcessor; -import org.reactivestreams.Processor; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.publisher.UnicastProcessor; - import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.Collections; @@ -41,6 +33,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.UnicastProcessor; /** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */ class RSocketClient implements RSocket { diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index e29bd6386..8e8afda0a 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -33,13 +33,12 @@ import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.util.EmptyPayload; -import reactor.core.publisher.Mono; - import java.time.Duration; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import reactor.core.publisher.Mono; /** Factory for creating RSocket clients and servers. */ public class RSocketFactory { diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index ead6d1b81..2b0eadaf2 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -26,6 +26,9 @@ import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.internal.LimitableRequestPublisher; import io.rsocket.internal.UnboundedProcessor; +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -36,10 +39,6 @@ import reactor.core.publisher.SignalType; import reactor.core.publisher.UnicastProcessor; -import java.util.Collections; -import java.util.Map; -import java.util.function.Consumer; - /** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */ class RSocketServer implements RSocket { diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java b/rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java index b2ac93ced..97de65a96 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java @@ -16,13 +16,12 @@ package io.rsocket.exceptions; +import static io.rsocket.frame.ErrorFrameFlyweight.*; + import io.netty.buffer.ByteBuf; import io.rsocket.frame.ErrorFrameFlyweight; - import java.util.Objects; -import static io.rsocket.frame.ErrorFrameFlyweight.*; - /** Utility class that generates an exception from a frame. */ public final class Exceptions { diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index af2e95895..af492508d 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -30,29 +30,26 @@ * and Reassembly */ public final class FragmentationDuplexConnection implements DuplexConnection { - public FragmentationDuplexConnection(DuplexConnection connection, int mtu) { - - } - + public FragmentationDuplexConnection(DuplexConnection connection, int mtu) {} + @Override public Mono send(Publisher frames) { return null; } - + @Override public Flux receive() { return null; } - + @Override public Mono onClose() { return null; } - + @Override - public void dispose() { - - } + public void dispose() {} + /* private final ByteBufAllocator byteBufAllocator; @@ -62,20 +59,23 @@ public void dispose() { private final IntObjectHashMap frameReassemblers = new IntObjectHashMap<>(); - *//** + */ + /** * Creates a new instance. * * @param delegate the {@link DuplexConnection} to decorate * @param maxFragmentSize the maximum fragment size * @throws NullPointerException if {@code delegate} is {@code null} * @throws IllegalArgumentException if {@code maxFragmentSize} is not {@code positive} - *//* + */ + /* // TODO: Remove once ByteBufAllocators are shared public FragmentationDuplexConnection(DuplexConnection delegate, int maxFragmentSize) { this(PooledByteBufAllocator.DEFAULT, delegate, maxFragmentSize); } - *//** + */ + /** * Creates a new instance. * * @param byteBufAllocator the {@link ByteBufAllocator} to use @@ -84,7 +84,8 @@ public FragmentationDuplexConnection(DuplexConnection delegate, int maxFragmentS * be fragmented. * @throws NullPointerException if {@code byteBufAllocator} or {@code delegate} are {@code null} * @throws IllegalArgumentException if {@code maxFragmentSize} is not {@code positive} - *//* + */ + /* public FragmentationDuplexConnection( ByteBufAllocator byteBufAllocator, DuplexConnection delegate, int maxFragmentSize) { diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java index 4f3346d5a..e9b4de243 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java @@ -16,10 +16,6 @@ package io.rsocket.fragmentation; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import reactor.core.publisher.Flux; - /** * The implementation of the RSocket fragmentation behavior. * @@ -28,32 +24,36 @@ * and Reassembly */ final class FrameFragmenter { -/* + /* private final ByteBufAllocator byteBufAllocator; private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final int maxFragmentSize; - *//** + */ + /** * Creates a new instance * * @param byteBufAllocator the {@link ByteBufAllocator} to use * @param maxFragmentSize the maximum size of each fragment - *//* + */ + /* FrameFragmenter(ByteBufAllocator byteBufAllocator, int maxFragmentSize) { this.byteBufAllocator = Objects.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null"); this.maxFragmentSize = maxFragmentSize; } - *//** + */ + /** * Returns a {@link Flux} of fragments frames * * @param frame the {@link ByteBuf} to fragment * @return a {@link Flux} of fragment frames * @throws NullPointerException if {@code frame} is {@code null} - *//* + */ + /* public Flux fragment(ByteBuf frame) { Objects.requireNonNull(frame, "frame must not be null"); diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java index 420ae2a5a..a44883915 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java @@ -16,7 +16,6 @@ package io.rsocket.fragmentation; -import io.netty.buffer.ByteBufAllocator; import reactor.core.Disposable; /** @@ -28,10 +27,8 @@ */ final class FrameReassembler implements Disposable { @Override - public void dispose() { - - } - + public void dispose() {} + @Override public boolean isDisposed() { return false; @@ -61,25 +58,29 @@ public void dispose() { handle.recycle(this); } - *//** + */ + /** * Creates a new instance * * @param byteBufAllocator the {@link ByteBufAllocator} to use * @return the {@code FrameReassembler} * @throws NullPointerException if {@code byteBufAllocator} is {@code null} - *//* + */ + /* static FrameReassembler createFrameReassembler(ByteBufAllocator byteBufAllocator) { return RECYCLER.get().setByteBufAllocator(byteBufAllocator); } - *//** + */ + /** * Reassembles a frame. If the frame is not a candidate for fragmentation, emits the frame. If * frame is a candidate for fragmentation, accumulates the content until the final fragment. * * @param frame the frame to inspect for reassembly * @return the reassembled frame if complete, otherwise {@code null} * @throws NullPointerException if {@code frame} is {@code null} - *//* + */ + /* @Nullable Frame reassemble(Frame frame) { Objects.requireNonNull(frame, "frame must not be null"); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java index 8ce495452..f07f5f004 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/DataAndMetadataFlyweight.java @@ -54,7 +54,7 @@ static ByteBuf metadataWithoutMarking(ByteBuf byteBuf, boolean hasMetadata) { } } - static ByteBuf metadata(ByteBuf byteBuf,boolean hasMetadata) { + static ByteBuf metadata(ByteBuf byteBuf, boolean hasMetadata) { byteBuf.markReaderIndex(); ByteBuf metadata = metadataWithoutMarking(byteBuf, hasMetadata); byteBuf.resetReaderIndex(); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java index 7dec87e59..55e23541e 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java @@ -4,7 +4,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.rsocket.exceptions.RSocketException; - import java.nio.charset.StandardCharsets; public class ErrorFrameFlyweight { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ErrorType.java b/rsocket-core/src/main/java/io/rsocket/frame/ErrorType.java index 1a4e438e3..ccbff374e 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ErrorType.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ErrorType.java @@ -1,6 +1,5 @@ package io.rsocket.frame; - /** * The types of {@link Error} that can be set. * @@ -8,68 +7,68 @@ * Codes */ public final class ErrorType { - + /** * Application layer logic generating a Reactive Streams onError event. Stream ID MUST be > 0. */ public static final int APPLICATION_ERROR = 0x00000201;; - + /** * The Responder canceled the request but may have started processing it (similar to REJECTED but * doesn't guarantee lack of side-effects). Stream ID MUST be > 0. */ public static final int CANCELED = 0x00000203; - + /** * The connection is being terminated. Stream ID MUST be 0. Sender or Receiver of this frame MUST * wait for outstanding streams to terminate before closing the connection. New requests MAY not * be accepted. */ public static final int CONNECTION_CLOSE = 0x00000102; - + /** * The connection is being terminated. Stream ID MUST be 0. Sender or Receiver of this frame MAY * close the connection immediately without waiting for outstanding streams to terminate. */ public static final int CONNECTION_ERROR = 0x00000101; - + /** The request is invalid. Stream ID MUST be > 0. */ public static final int INVALID = 0x00000204; - + /** * The Setup frame is invalid for the server (it could be that the client is too recent for the * old server). Stream ID MUST be 0. */ public static final int INVALID_SETUP = 0x00000001; - + /** * Despite being a valid request, the Responder decided to reject it. The Responder guarantees * that it didn't process the request. The reason for the rejection is explained in the Error Data * section. Stream ID MUST be > 0. */ public static final int REJECTED = 0x00000202; - + /** * The server rejected the resume, it can specify the reason in the payload. Stream ID MUST be 0. */ public static final int REJECTED_RESUME = 0x00000004; - + /** * The server rejected the setup, it can specify the reason in the payload. Stream ID MUST be 0. */ public static final int REJECTED_SETUP = 0x00000003; - + /** Reserved. */ public static final int RESERVED = 0x00000000; - + /** Reserved for Extension Use. */ public static final int RESERVED_FOR_EXTENSION = 0xFFFFFFFF; - + /** * Some (or all) of the parameters specified by the client are unsupported by the server. Stream * ID MUST be 0. */ public static final int UNSUPPORTED_SETUP = 0x00000002; - + private ErrorType() {} } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java index d4a049f14..18516eb27 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameFlyweight.java @@ -2,15 +2,17 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; - import javax.annotation.Nullable; public class ExtensionFrameFlyweight { private ExtensionFrameFlyweight() {} public static ByteBuf encode( - ByteBufAllocator allocator, int streamId, int extendedType, - @Nullable ByteBuf metadata, ByteBuf data) { + ByteBufAllocator allocator, + int streamId, + int extendedType, + @Nullable ByteBuf metadata, + ByteBuf data) { int flags = FrameHeaderFlyweight.FLAGS_I; @@ -41,7 +43,7 @@ public static ByteBuf data(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); byteBuf.markReaderIndex(); - //Extended type + // Extended type byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES); ByteBuf data = DataAndMetadataFlyweight.dataWithoutMarking(byteBuf, hasMetadata); byteBuf.resetReaderIndex(); @@ -53,7 +55,7 @@ public static ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderFlyweight.hasMetadata(byteBuf); byteBuf.markReaderIndex(); - //Extended type + // Extended type byteBuf.skipBytes(FrameHeaderFlyweight.size() + Integer.BYTES); ByteBuf metadata = DataAndMetadataFlyweight.metadataWithoutMarking(byteBuf, hasMetadata); byteBuf.resetReaderIndex(); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/FrameType.java b/rsocket-core/src/main/java/io/rsocket/frame/FrameType.java index b8c55bc6b..8ac743f87 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/FrameType.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/FrameType.java @@ -25,12 +25,12 @@ * Types */ public enum FrameType { - + /** Reserved. */ RESERVED(0x00), - + // CONNECTION - + /** * Sent by client to initiate protocol processing. * @@ -39,7 +39,7 @@ public enum FrameType { * Frame */ SETUP(0x01, Flags.CAN_HAVE_DATA | Flags.CAN_HAVE_METADATA), - + /** * Sent by Responder to grant the ability to send requests. * @@ -48,7 +48,7 @@ public enum FrameType { * Frame */ LEASE(0x02, Flags.CAN_HAVE_METADATA), - + /** * Connection keepalive. * @@ -57,9 +57,9 @@ public enum FrameType { * Frame */ KEEPALIVE(0x03, Flags.CAN_HAVE_DATA), - + // START REQUEST - + /** * Request single response. * @@ -73,7 +73,7 @@ public enum FrameType { | Flags.CAN_HAVE_METADATA | Flags.IS_FRAGMENTABLE | Flags.IS_REQUEST_TYPE), - + /** * A single one-way message. * @@ -86,7 +86,7 @@ public enum FrameType { | Flags.CAN_HAVE_METADATA | Flags.IS_FRAGMENTABLE | Flags.IS_REQUEST_TYPE), - + /** * Request a completable stream. * @@ -101,7 +101,7 @@ public enum FrameType { | Flags.HAS_INITIAL_REQUEST_N | Flags.IS_FRAGMENTABLE | Flags.IS_REQUEST_TYPE), - + /** * Request a completable stream in both directions. * @@ -116,9 +116,9 @@ public enum FrameType { | Flags.HAS_INITIAL_REQUEST_N | Flags.IS_FRAGMENTABLE | Flags.IS_REQUEST_TYPE), - + // DURING REQUEST - + /** * Request N more items with Reactive Streams semantics. * @@ -127,7 +127,7 @@ public enum FrameType { * Frame */ REQUEST_N(0x08), - + /** * Cancel outstanding request. * @@ -135,9 +135,9 @@ public enum FrameType { * Frame */ CANCEL(0x09), - + // RESPONSE - + /** * Payload on a stream. For example, response to a request, or message on a channel. * @@ -145,7 +145,7 @@ public enum FrameType { * Frame */ PAYLOAD(0x0A, Flags.CAN_HAVE_DATA | Flags.CAN_HAVE_METADATA | Flags.IS_FRAGMENTABLE), - + /** * Error at connection or application level. * @@ -153,9 +153,9 @@ public enum FrameType { * Frame */ ERROR(0x0B, Flags.CAN_HAVE_DATA), - + // METADATA - + /** * Asynchronous Metadata frame. * @@ -164,9 +164,9 @@ public enum FrameType { * Push Frame */ METADATA_PUSH(0x0C, Flags.CAN_HAVE_METADATA), - + // RESUMPTION - + /** * Replaces SETUP for Resuming Operation (optional). * @@ -174,7 +174,7 @@ public enum FrameType { * Frame */ RESUME(0x0D), - + /** * Sent in response to a RESUME if resuming operation possible (optional). * @@ -183,18 +183,18 @@ public enum FrameType { * Frame */ RESUME_OK(0x0E), - + // SYNTHETIC PAYLOAD TYPES - + /** A {@link #PAYLOAD} frame with {@code NEXT} flag set. */ NEXT(0xA0, Flags.CAN_HAVE_DATA | Flags.CAN_HAVE_METADATA | Flags.IS_FRAGMENTABLE), - + /** A {@link #PAYLOAD} frame with {@code COMPLETE} flag set. */ COMPLETE(0xB0), - + /** A {@link #PAYLOAD} frame with {@code NEXT} and {@code COMPLETE} flags set. */ NEXT_COMPLETE(0xC0, Flags.CAN_HAVE_DATA | Flags.CAN_HAVE_METADATA | Flags.IS_FRAGMENTABLE), - + /** * Used To Extend more frame types as well as extensions. * @@ -202,32 +202,32 @@ public enum FrameType { * Frame */ EXT(0x3F, Flags.CAN_HAVE_DATA | Flags.CAN_HAVE_METADATA); - + /** The size of the encoded frame type */ static final int ENCODED_SIZE = 6; - + private static final FrameType[] FRAME_TYPES_BY_ENCODED_TYPE; - + static { FRAME_TYPES_BY_ENCODED_TYPE = new FrameType[getMaximumEncodedType() + 1]; - + for (FrameType frameType : values()) { FRAME_TYPES_BY_ENCODED_TYPE[frameType.encodedType] = frameType; } } - + private final int encodedType; private final int flags; - + FrameType(int encodedType) { this(encodedType, Flags.EMPTY); } - + FrameType(int encodedType, int flags) { this.encodedType = encodedType; this.flags = flags; } - + /** * Returns the {@code FrameType} that matches the specified {@code encodedType}. * @@ -236,18 +236,18 @@ public enum FrameType { */ public static FrameType fromEncodedType(int encodedType) { FrameType frameType = FRAME_TYPES_BY_ENCODED_TYPE[encodedType]; - + if (frameType == null) { throw new IllegalArgumentException(String.format("Frame type %d is unknown", encodedType)); } - + return frameType; } - + private static int getMaximumEncodedType() { return Arrays.stream(values()).mapToInt(frameType -> frameType.encodedType).max().orElse(0); } - + /** * Whether the frame type can have data. * @@ -256,7 +256,7 @@ private static int getMaximumEncodedType() { public boolean canHaveData() { return Flags.CAN_HAVE_DATA == (flags & Flags.CAN_HAVE_DATA); } - + /** * Whether the frame type can have metadata * @@ -265,7 +265,7 @@ public boolean canHaveData() { public boolean canHaveMetadata() { return Flags.CAN_HAVE_METADATA == (flags & Flags.CAN_HAVE_METADATA); } - + /** * Returns the encoded type. * @@ -274,7 +274,7 @@ public boolean canHaveMetadata() { public int getEncodedType() { return encodedType; } - + /** * Whether the frame type starts with an initial {@code requestN}. * @@ -283,7 +283,7 @@ public int getEncodedType() { public boolean hasInitialRequestN() { return Flags.HAS_INITIAL_REQUEST_N == (flags & Flags.HAS_INITIAL_REQUEST_N); } - + /** * Whether the frame type is fragmentable. * @@ -292,7 +292,7 @@ public boolean hasInitialRequestN() { public boolean isFragmentable() { return Flags.IS_FRAGMENTABLE == (flags & Flags.IS_FRAGMENTABLE); } - + /** * Whether the frame type is a request type. * @@ -301,15 +301,15 @@ public boolean isFragmentable() { public boolean isRequestType() { return Flags.IS_REQUEST_TYPE == (flags & Flags.IS_REQUEST_TYPE); } - + private static class Flags { - private static final int EMPTY = 0b00000; - private static final int CAN_HAVE_DATA = 0b10000; - private static final int CAN_HAVE_METADATA = 0b01000; - private static final int IS_FRAGMENTABLE = 0b00100; - private static final int IS_REQUEST_TYPE = 0b00010; + private static final int EMPTY = 0b00000; + private static final int CAN_HAVE_DATA = 0b10000; + private static final int CAN_HAVE_METADATA = 0b01000; + private static final int IS_FRAGMENTABLE = 0b00100; + private static final int IS_REQUEST_TYPE = 0b00010; private static final int HAS_INITIAL_REQUEST_N = 0b00001; - + private Flags() {} } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFlyweight.java index c49f3639c..8747da0bb 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFlyweight.java @@ -3,7 +3,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; - import javax.annotation.Nullable; public class LeaseFlyweight { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java index 15d048eb4..83f2406dd 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameFlyweight.java @@ -51,18 +51,17 @@ public static ByteBuf encodeNextComplete( payload.hasMetadata() ? payload.sliceMetadata().retain() : null, payload.sliceData().retain()); } - - public static ByteBuf encodeNext( - ByteBufAllocator allocator, int streamId, Payload payload) { + + public static ByteBuf encodeNext(ByteBufAllocator allocator, int streamId, Payload payload) { return FLYWEIGHT.encode( - allocator, - streamId, - false, - false, - true, - 0, - payload.hasMetadata() ? payload.sliceMetadata().retain() : null, - payload.sliceData().retain()); + allocator, + streamId, + false, + false, + true, + 0, + payload.hasMetadata() ? payload.sliceMetadata().retain() : null, + payload.sliceData().retain()); } public static ByteBuf encodeComplete(ByteBufAllocator allocator, int streamId) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java index 0a6f44f63..fb6ecebb0 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameFlyweight.java @@ -18,9 +18,7 @@ public static ByteBuf encode( ByteBuf metadata, ByteBuf data) { - int reqN = requestN > Integer.MAX_VALUE - ? Integer.MAX_VALUE - : (int) requestN; + int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; return FLYWEIGHT.encode( allocator, streamId, fragmentFollows, complete, false, reqN, metadata, data); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java index 7510b58f2..8196c56d8 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFlyweight.java @@ -2,7 +2,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; - import javax.annotation.Nullable; class RequestFlyweight { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java index 8fce45b7c..3e858f5d4 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameFlyweight.java @@ -47,9 +47,7 @@ public static ByteBuf encode( long requestN, ByteBuf metadata, ByteBuf data) { - int reqN = requestN > Integer.MAX_VALUE - ? Integer.MAX_VALUE - : (int) requestN; + int reqN = requestN > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) requestN; return encode(allocator, streamId, fragmentFollows, reqN, metadata, data); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ResumeFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ResumeFlyweight.java index 617be553a..899957718 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ResumeFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ResumeFlyweight.java @@ -1,4 +1,3 @@ package io.rsocket.frame; -public class ResumeFlyweight { -} +public class ResumeFlyweight {} diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ResumeOkFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/ResumeOkFlyweight.java index 87513e410..d947a3f76 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ResumeOkFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ResumeOkFlyweight.java @@ -1,4 +1,3 @@ package io.rsocket.frame; -public class ResumeOkFlyweight { -} +public class ResumeOkFlyweight {} diff --git a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java index 6bd29c629..2a5992419 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameFlyweight.java @@ -4,7 +4,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; - import java.nio.charset.StandardCharsets; public class SetupFrameFlyweight { @@ -23,7 +22,8 @@ public class SetupFrameFlyweight { private static final int KEEPALIVE_INTERVAL_FIELD_OFFSET = VERSION_FIELD_OFFSET + Integer.BYTES; private static final int KEEPALIVE_MAX_LIFETIME_FIELD_OFFSET = KEEPALIVE_INTERVAL_FIELD_OFFSET + Integer.BYTES; - private static final int VARIABLE_DATA_OFFSET = KEEPALIVE_MAX_LIFETIME_FIELD_OFFSET + Integer.BYTES; + private static final int VARIABLE_DATA_OFFSET = + KEEPALIVE_MAX_LIFETIME_FIELD_OFFSET + Integer.BYTES; public static ByteBuf encode( final ByteBufAllocator allocator, diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java index 66a24cd9d..692dcb363 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java @@ -5,14 +5,11 @@ import io.rsocket.Payload; import io.rsocket.frame.*; import io.rsocket.util.ByteBufPayload; -import io.rsocket.util.DefaultPayload; -import io.rsocket.util.EmptyPayload; - import java.nio.ByteBuffer; /** Default Frame decoder that copies the frames contents for easy of use. */ class DefaultPayloadDecoder implements PayloadDecoder { - + @Override public Payload apply(ByteBuf byteBuf) { ByteBuf m; @@ -47,15 +44,15 @@ public Payload apply(ByteBuf byteBuf) { default: throw new IllegalArgumentException("unsupported frame type: " + type); } - + ByteBuffer metadata = ByteBuffer.allocateDirect(m.readableBytes()); ByteBuffer data = ByteBuffer.allocateDirect(d.readableBytes()); - + data.put(d.nioBuffer()); data.flip(); metadata.put(m.nioBuffer()); metadata.flip(); - + return ByteBufPayload.create(data, metadata); } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/PayloadDecoder.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/PayloadDecoder.java index 80896f71e..197eca9b0 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/decoder/PayloadDecoder.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/PayloadDecoder.java @@ -2,7 +2,6 @@ import io.netty.buffer.ByteBuf; import io.rsocket.Payload; - import java.util.function.Function; public interface PayloadDecoder extends Function { diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index bcfa77287..607a7ec73 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -17,7 +17,10 @@ package io.rsocket.internal; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -29,11 +32,6 @@ import reactor.util.concurrent.Queues; import reactor.util.context.Context; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - /** * A Processor implementation that takes a custom queue and allows only a single subscriber. * @@ -47,12 +45,15 @@ public final class UnboundedProcessor extends FluxProcessor @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "once"); + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(UnboundedProcessor.class, "wip"); + @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested"); + final Queue queue; volatile boolean done; Throwable error; @@ -123,27 +124,27 @@ void drainRegular(Subscriber a) { } } } - + void drainFused(Subscriber a) { int missed = 1; - + final Queue q = queue; - - for (;;) { - + + for (; ; ) { + if (cancelled) { q.clear(); actual = null; return; } - + boolean d = done; - + a.onNext(null); - + if (d) { actual = null; - + Throwable ex = error; if (ex != null) { a.onError(ex); @@ -152,26 +153,25 @@ void drainFused(Subscriber a) { } return; } - + missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } - - + public void drain() { if (WIP.getAndIncrement(this) != 0) { return; } int missed = 1; - + for (; ; ) { Subscriber a = actual; if (a != null) { - + if (outputFused) { drainFused(a); } else { @@ -344,7 +344,7 @@ public void clear() { } } } - + @Override public int requestFusion(int requestedMode) { if ((requestedMode & Fuseable.ASYNC) != 0) { @@ -353,7 +353,7 @@ public int requestFusion(int requestedMode) { } return Fuseable.NONE; } - + @Override public void dispose() { cancel(); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java index 47849c4c0..1e616b427 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java @@ -1,5 +1,9 @@ package io.rsocket.internal; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.LongSupplier; +import java.util.stream.Stream; import org.reactivestreams.Processor; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -12,11 +16,6 @@ import reactor.util.context.Context; import reactor.util.function.Tuple2; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.function.LongSupplier; -import java.util.stream.Stream; - public class UnicastMonoProcessor extends Mono implements Processor, CoreSubscriber, diff --git a/rsocket-core/src/main/java/io/rsocket/lease/Lease.java b/rsocket-core/src/main/java/io/rsocket/lease/Lease.java index c36513b7b..62ce16907 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/Lease.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/Lease.java @@ -17,7 +17,6 @@ package io.rsocket.lease; import io.netty.buffer.ByteBuf; - import javax.annotation.Nullable; /** A contract for RSocket lease, which is sent by a request acceptor and is time bound. */ diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumeCache.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumeCache.java index 7e83ab492..f85949c60 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumeCache.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumeCache.java @@ -17,9 +17,8 @@ package io.rsocket.resume; import io.netty.buffer.ByteBuf; -import reactor.core.publisher.Flux; - import java.util.*; +import reactor.core.publisher.Flux; public class ResumeCache { private final ResumePositionCounter strategy; diff --git a/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java b/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java index a6c59c735..84e6a6a43 100644 --- a/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java +++ b/rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java @@ -8,6 +8,12 @@ import io.rsocket.frame.KeepAliveFrameFlyweight; import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -16,13 +22,6 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.stream.Stream; - public class KeepAliveTest { private static final int CLIENT_REQUESTER_TICK_PERIOD = 100; private static final int CLIENT_REQUESTER_TIMEOUT = 700; diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java index 54e3c3c7b..ae3bfc489 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java @@ -2,6 +2,10 @@ import io.rsocket.RSocketClientTest.ClientSocketRule; import io.rsocket.util.EmptyPayload; +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.Arrays; +import java.util.function.Function; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -11,17 +15,10 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; - -import java.nio.channels.ClosedChannelException; -import java.time.Duration; -import java.util.Arrays; -import java.util.function.Function; - @RunWith(Parameterized.class) public class RSocketClientTerminationTest { - - @Rule - public final ClientSocketRule rule = new ClientSocketRule(); + + @Rule public final ClientSocketRule rule = new ClientSocketRule(); private Function> interaction; public RSocketClientTerminationTest(Function> interaction) { @@ -32,9 +29,7 @@ public RSocketClientTerminationTest(Function> in public void testCurrentStreamIsTerminatedOnConnectionClose() { RSocketClient rSocket = rule.socket; - Mono.delay(Duration.ofSeconds(1)) - .doOnNext(v -> rule.connection.dispose()) - .subscribe(); + Mono.delay(Duration.ofSeconds(1)).doOnNext(v -> rule.connection.dispose()).subscribe(); StepVerifier.create(interaction.apply(rSocket)) .expectError(ClosedChannelException.class) diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index e797385fe..2224ba393 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -16,6 +16,13 @@ package io.rsocket; +import static io.rsocket.frame.FrameHeaderFlyweight.frameType; +import static io.rsocket.frame.FrameType.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.exceptions.ApplicationErrorException; @@ -24,6 +31,10 @@ import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -34,18 +45,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import static io.rsocket.frame.FrameHeaderFlyweight.frameType; -import static io.rsocket.frame.FrameType.*; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; - public class RSocketClientTest { @Rule public final ClientSocketRule rule = new ClientSocketRule(); diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java index 4ca1caabb..1d3417e02 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java @@ -16,6 +16,10 @@ package io.rsocket; +import static io.rsocket.frame.FrameHeaderFlyweight.frameType; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; @@ -24,20 +28,15 @@ import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Subscriber; import reactor.core.publisher.Mono; -import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import static io.rsocket.frame.FrameHeaderFlyweight.frameType; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; - public class RSocketServerTest { @Rule public final ServerSocketRule rule = new ServerSocketRule(); diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketTest.java index 0e2ebb116..7fcf46674 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketTest.java @@ -16,6 +16,11 @@ package io.rsocket; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.exceptions.ApplicationErrorException; @@ -23,6 +28,7 @@ import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; +import java.util.ArrayList; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Rule; @@ -37,13 +43,6 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.util.ArrayList; - -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; - public class RSocketTest { @Rule public final SocketRule rule = new SocketRule(); @@ -88,12 +87,9 @@ public Mono requestResponse(Payload payload) { @Test(timeout = 2000) public void testChannel() throws Exception { Flux requests = - Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i)); + Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i)); Flux responses = rule.crs.requestChannel(requests); - StepVerifier.create(responses) - .expectNextCount(10) - .expectComplete() - .verify(); + StepVerifier.create(responses).expectNextCount(10).expectComplete().verify(); } public static class SocketRule extends ExternalResource { diff --git a/rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java index 5cd0bebbc..2326f338d 100644 --- a/rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java @@ -1,5 +1,8 @@ package io.rsocket; +import static io.rsocket.transport.ServerTransport.ConnectionAcceptor; +import static org.assertj.core.api.Assertions.assertThat; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.exceptions.Exceptions; @@ -11,19 +14,15 @@ import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.transport.ServerTransport; import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import org.junit.Ignore; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; import reactor.core.publisher.UnicastProcessor; import reactor.test.StepVerifier; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import static io.rsocket.transport.ServerTransport.ConnectionAcceptor; -import static org.assertj.core.api.Assertions.assertThat; - @Ignore public class SetupRejectionTest { diff --git a/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java b/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java index 50d55c896..c7bbfadf6 100644 --- a/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java +++ b/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java @@ -16,10 +16,8 @@ package io.rsocket.exceptions; -import static org.assertj.core.api.Assertions.assertThat; - final class ExceptionsTest { -/* + /* @DisplayName("from returns ApplicationErrorException") @Test void fromApplicationException() { diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java index e033cacf5..6b25ac902 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java @@ -17,7 +17,7 @@ package io.rsocket.fragmentation; final class FragmentationDuplexConnectionTest { -/* + /* private final DuplexConnection delegate = mock(DuplexConnection.class, RETURNS_SMART_NULLS); @SuppressWarnings("unchecked") diff --git a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java index 418038ce8..467f6c2e7 100644 --- a/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/fragmentation/FrameReassemblerTest.java @@ -17,7 +17,7 @@ package io.rsocket.fragmentation; final class FrameReassemblerTest { -/* + /* @DisplayName("createFrameReassembler throws NullPointerException") @Test void createFrameReassemblerNullByteBufAllocator() { diff --git a/rsocket-core/src/test/java/io/rsocket/frame/ByteBufRepresentation.java b/rsocket-core/src/test/java/io/rsocket/frame/ByteBufRepresentation.java index 137cd20a3..b22a95c0b 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/ByteBufRepresentation.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/ByteBufRepresentation.java @@ -20,13 +20,13 @@ import org.assertj.core.presentation.StandardRepresentation; public final class ByteBufRepresentation extends StandardRepresentation { - + @Override protected String fallbackToStringOf(Object object) { if (object instanceof ByteBuf) { return ByteBufUtil.prettyHexDump((ByteBuf) object); } - + return super.fallbackToStringOf(object); } -} \ No newline at end of file +} diff --git a/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java index b220194b0..fa663432c 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/ErrorFrameFlyweightTest.java @@ -1,23 +1,21 @@ package io.rsocket.frame; +import static org.junit.jupiter.api.Assertions.*; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.rsocket.exceptions.ApplicationErrorException; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; - class ErrorFrameFlyweightTest { @Test void testEncode() { - ByteBuf frame = ErrorFrameFlyweight - .encode( - ByteBufAllocator.DEFAULT, - 1, new ApplicationErrorException("d")); - + ByteBuf frame = + ErrorFrameFlyweight.encode(ByteBufAllocator.DEFAULT, 1, new ApplicationErrorException("d")); + frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); assertEquals("00000b000000012c000000020164", ByteBufUtil.hexDump(frame)); frame.release(); } -} \ No newline at end of file +} diff --git a/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java index ad4e47513..e337d4332 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/ExtensionFrameFlyweightTest.java @@ -3,11 +3,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; - public class ExtensionFrameFlyweightTest { @Test @@ -16,7 +15,8 @@ void extensionDataMetadata() { ByteBuf data = bytebuf("d"); int extendedType = 1; - ByteBuf extension = ExtensionFrameFlyweight.encode(ByteBufAllocator.DEFAULT, 1, extendedType, metadata, data); + ByteBuf extension = + ExtensionFrameFlyweight.encode(ByteBufAllocator.DEFAULT, 1, extendedType, metadata, data); Assertions.assertTrue(FrameHeaderFlyweight.hasMetadata(extension)); Assertions.assertEquals(extendedType, ExtensionFrameFlyweight.extendedType(extension)); @@ -30,8 +30,8 @@ void extensionData() { ByteBuf data = bytebuf("d"); int extendedType = 1; - ByteBuf extension = ExtensionFrameFlyweight.encode(ByteBufAllocator.DEFAULT, 1, - extendedType, null, data); + ByteBuf extension = + ExtensionFrameFlyweight.encode(ByteBufAllocator.DEFAULT, 1, extendedType, null, data); Assertions.assertFalse(FrameHeaderFlyweight.hasMetadata(extension)); Assertions.assertEquals(extendedType, ExtensionFrameFlyweight.extendedType(extension)); @@ -45,8 +45,9 @@ void extensionMetadata() { ByteBuf metadata = bytebuf("md"); int extendedType = 1; - ByteBuf extension = ExtensionFrameFlyweight.encode(ByteBufAllocator.DEFAULT, 1, - extendedType, metadata, Unpooled.EMPTY_BUFFER); + ByteBuf extension = + ExtensionFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, 1, extendedType, metadata, Unpooled.EMPTY_BUFFER); Assertions.assertTrue(FrameHeaderFlyweight.hasMetadata(extension)); Assertions.assertEquals(extendedType, ExtensionFrameFlyweight.extendedType(extension)); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/FrameHeaderFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/FrameHeaderFlyweightTest.java index 42494dc89..a17fcc205 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/FrameHeaderFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/FrameHeaderFlyweightTest.java @@ -1,43 +1,33 @@ package io.rsocket.frame; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; - class FrameHeaderFlyweightTest { // Taken from spec private static final int FRAME_MAX_SIZE = 16_777_215; - + @Test void typeAndFlag() { FrameType frameType = FrameType.REQUEST_FNF; int flags = 0b1110110111; - ByteBuf header = FrameHeaderFlyweight - .encode( - ByteBufAllocator.DEFAULT, - 0, - frameType, - flags); - + ByteBuf header = FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 0, frameType, flags); + assertEquals(flags, FrameHeaderFlyweight.flags(header)); assertEquals(frameType, FrameHeaderFlyweight.frameType(header)); header.release(); } - + @Test void typeAndFlagTruncated() { FrameType frameType = FrameType.SETUP; int flags = 0b11110110111; // 1 bit too many - ByteBuf header = FrameHeaderFlyweight - .encode( - ByteBufAllocator.DEFAULT, - 0, - frameType, - flags); - + ByteBuf header = FrameHeaderFlyweight.encode(ByteBufAllocator.DEFAULT, 0, frameType, flags); + assertNotEquals(flags, FrameHeaderFlyweight.flags(header)); assertEquals(flags & 0b0000_0011_1111_1111, FrameHeaderFlyweight.flags(header)); assertEquals(frameType, FrameHeaderFlyweight.frameType(header)); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/KeepaliveFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/KeepaliveFrameFlyweightTest.java index 00ef513fb..eb55e89cd 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/KeepaliveFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/KeepaliveFrameFlyweightTest.java @@ -1,15 +1,14 @@ package io.rsocket.frame; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import org.junit.jupiter.api.Test; - import java.nio.charset.StandardCharsets; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; class KeepaliveFrameFlyweightTest { @Test diff --git a/rsocket-core/src/test/java/io/rsocket/frame/LeaseFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/LeaseFlyweightTest.java index 137f546ef..5c226f309 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/LeaseFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/LeaseFlyweightTest.java @@ -3,11 +3,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; - public class LeaseFlyweightTest { @Test diff --git a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java index c7d99afdb..9ef89326a 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/PayloadFlyweightTest.java @@ -5,17 +5,17 @@ import io.netty.buffer.Unpooled; import io.rsocket.Payload; import io.rsocket.util.DefaultPayload; +import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; - public class PayloadFlyweightTest { @Test void nextCompleteDataMetadata() { Payload payload = DefaultPayload.create("d", "md"); - ByteBuf nextComplete = PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); + ByteBuf nextComplete = + PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(nextComplete).toString(StandardCharsets.UTF_8); String metadata = PayloadFrameFlyweight.metadata(nextComplete).toString(StandardCharsets.UTF_8); Assertions.assertEquals("d", data); @@ -26,7 +26,8 @@ void nextCompleteDataMetadata() { @Test void nextCompleteData() { Payload payload = DefaultPayload.create("d"); - ByteBuf nextComplete = PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); + ByteBuf nextComplete = + PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); String data = PayloadFrameFlyweight.data(nextComplete).toString(StandardCharsets.UTF_8); ByteBuf metadata = PayloadFrameFlyweight.metadata(nextComplete); Assertions.assertEquals("d", data); @@ -36,11 +37,12 @@ void nextCompleteData() { @Test void nextCompleteMetaData() { - Payload payload = DefaultPayload.create( - Unpooled.EMPTY_BUFFER, - Unpooled.wrappedBuffer("md".getBytes(StandardCharsets.UTF_8))); + Payload payload = + DefaultPayload.create( + Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer("md".getBytes(StandardCharsets.UTF_8))); - ByteBuf nextComplete = PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); + ByteBuf nextComplete = + PayloadFrameFlyweight.encodeNextComplete(ByteBufAllocator.DEFAULT, 1, payload); ByteBuf data = PayloadFrameFlyweight.data(nextComplete); String metadata = PayloadFrameFlyweight.metadata(nextComplete).toString(StandardCharsets.UTF_8); Assertions.assertTrue(data.readableBytes() == 0); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java index c788b074c..9acec2c81 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/RequestFlyweightTest.java @@ -1,14 +1,13 @@ package io.rsocket.frame; +import static org.junit.jupiter.api.Assertions.*; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import org.junit.jupiter.api.Test; - import java.nio.charset.StandardCharsets; - -import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; class RequestFlyweightTest { @Test @@ -27,52 +26,54 @@ void testEncoding() { assertEquals("000010000000011900000000010000026d6464", ByteBufUtil.hexDump(frame)); frame.release(); } - + @Test void testEncodingWithEmptyMetadata() { ByteBuf frame = - RequestStreamFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - 1, - Unpooled.EMPTY_BUFFER, - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); - + RequestStreamFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + 1, + Unpooled.EMPTY_BUFFER, + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - + assertEquals("00000e0000000119000000000100000064", ByteBufUtil.hexDump(frame)); frame.release(); } - + @Test void testEncodingWithNullMetadata() { ByteBuf frame = - RequestStreamFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - 1, - null, - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); - + RequestStreamFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + 1, + null, + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + frame = FrameLengthFlyweight.encode(ByteBufAllocator.DEFAULT, frame.readableBytes(), frame); - + assertEquals("00000b0000000118000000000164", ByteBufUtil.hexDump(frame)); frame.release(); } @Test void requestResponseDataMetadata() { - ByteBuf request = RequestResponseFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + ByteBuf request = + RequestResponseFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); String data = RequestResponseFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); - String metadata = RequestResponseFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); + String metadata = + RequestResponseFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("d", data); @@ -82,12 +83,13 @@ void requestResponseDataMetadata() { @Test void requestResponseData() { - ByteBuf request = RequestResponseFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - null, - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + ByteBuf request = + RequestResponseFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + null, + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); String data = RequestResponseFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); ByteBuf metadata = RequestResponseFrameFlyweight.metadata(request); @@ -100,15 +102,17 @@ void requestResponseData() { @Test void requestResponseMetadata() { - ByteBuf request = RequestResponseFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), - Unpooled.EMPTY_BUFFER); + ByteBuf request = + RequestResponseFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), + Unpooled.EMPTY_BUFFER); ByteBuf data = RequestResponseFrameFlyweight.data(request); - String metadata = RequestResponseFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); + String metadata = + RequestResponseFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); assertTrue(data.readableBytes() == 0); @@ -118,17 +122,19 @@ void requestResponseMetadata() { @Test void requestStreamDataMetadata() { - ByteBuf request = RequestStreamFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - Integer.MAX_VALUE + 1L, - Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + ByteBuf request = + RequestStreamFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + Integer.MAX_VALUE + 1L, + Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); String data = RequestStreamFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); - String metadata = RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); + String metadata = + RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); assertEquals(Integer.MAX_VALUE, actualRequest); @@ -139,13 +145,14 @@ void requestStreamDataMetadata() { @Test void requestStreamData() { - ByteBuf request = RequestStreamFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - 42, - null, - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + ByteBuf request = + RequestStreamFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + 42, + null, + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); String data = RequestStreamFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); @@ -160,17 +167,19 @@ void requestStreamData() { @Test void requestStreamMetadata() { - ByteBuf request = RequestStreamFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - 42, - Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), - Unpooled.EMPTY_BUFFER); + ByteBuf request = + RequestStreamFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + 42, + Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), + Unpooled.EMPTY_BUFFER); int actualRequest = RequestStreamFrameFlyweight.initialRequestN(request); ByteBuf data = RequestStreamFrameFlyweight.data(request); - String metadata = RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); + String metadata = + RequestStreamFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); assertEquals(42, actualRequest); @@ -181,15 +190,17 @@ void requestStreamMetadata() { @Test void requestFnfDataAndMetadata() { - ByteBuf request = RequestFireAndForgetFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + ByteBuf request = + RequestFireAndForgetFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); String data = RequestFireAndForgetFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); - String metadata = RequestFireAndForgetFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); + String metadata = + RequestFireAndForgetFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("d", data); @@ -199,12 +210,13 @@ void requestFnfDataAndMetadata() { @Test void requestFnfData() { - ByteBuf request = RequestFireAndForgetFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - null, - Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); + ByteBuf request = + RequestFireAndForgetFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + null, + Unpooled.copiedBuffer("d", StandardCharsets.UTF_8)); String data = RequestFireAndForgetFrameFlyweight.data(request).toString(StandardCharsets.UTF_8); ByteBuf metadata = RequestFireAndForgetFrameFlyweight.metadata(request); @@ -217,15 +229,17 @@ void requestFnfData() { @Test void requestFnfMetadata() { - ByteBuf request = RequestFireAndForgetFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - 1, - false, - Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), - Unpooled.EMPTY_BUFFER); + ByteBuf request = + RequestFireAndForgetFrameFlyweight.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + Unpooled.copiedBuffer("md", StandardCharsets.UTF_8), + Unpooled.EMPTY_BUFFER); ByteBuf data = RequestFireAndForgetFrameFlyweight.data(request); - String metadata = RequestFireAndForgetFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); + String metadata = + RequestFireAndForgetFrameFlyweight.metadata(request).toString(StandardCharsets.UTF_8); assertTrue(FrameHeaderFlyweight.hasMetadata(request)); assertEquals("md", metadata); diff --git a/rsocket-core/src/test/java/io/rsocket/frame/RequestNFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/RequestNFrameFlyweightTest.java index 6ffb2e123..4411b98c9 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/RequestNFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/RequestNFrameFlyweightTest.java @@ -1,12 +1,12 @@ package io.rsocket.frame; +import static org.junit.jupiter.api.Assertions.assertEquals; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; - class RequestNFrameFlyweightTest { @Test void testEncoding() { diff --git a/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameFlyweightTest.java index b8c36747b..36c9946aa 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameFlyweightTest.java @@ -1,15 +1,14 @@ package io.rsocket.frame; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import org.junit.jupiter.api.Test; - import java.nio.charset.StandardCharsets; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import org.junit.jupiter.api.Test; class SetupFrameFlyweightTest { @Test diff --git a/rsocket-core/src/test/java/io/rsocket/frame/VersionFlyweightTest.java b/rsocket-core/src/test/java/io/rsocket/frame/VersionFlyweightTest.java index 25be38482..3f311c7ef 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/VersionFlyweightTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/VersionFlyweightTest.java @@ -16,9 +16,10 @@ package io.rsocket.frame; -import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; + public class VersionFlyweightTest { @Test public void simple() { diff --git a/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java b/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java index f8d1a980b..6cbf050ac 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java @@ -16,21 +16,21 @@ package io.rsocket.internal; +import static org.junit.Assert.assertEquals; + import io.netty.buffer.ByteBufAllocator; import io.rsocket.frame.ErrorFrameFlyweight; import io.rsocket.plugins.PluginRegistry; import io.rsocket.test.util.TestDuplexConnection; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; - public class ClientServerInputMultiplexerTest { private TestDuplexConnection source; private ClientServerInputMultiplexer multiplexer; private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; + @Before public void setup() { source = new TestDuplexConnection(); @@ -58,7 +58,7 @@ public void testSplits() { .receive() .doOnNext(f -> connectionFrames.incrementAndGet()) .subscribe(); - + source.addToReceivedBuffer(ErrorFrameFlyweight.encode(allocator, 1, new Exception())); assertEquals(1, clientFrames.get()); assertEquals(0, serverFrames.get()); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 2e7c3b66d..fd48cd9d3 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -18,6 +18,9 @@ import io.netty.buffer.ByteBuf; import io.rsocket.DuplexConnection; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.slf4j.Logger; @@ -27,10 +30,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; - /** * An implementation of {@link DuplexConnection} that provides functionality to modify the behavior * dynamically. diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestSubscriber.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestSubscriber.java index e5a1a3d4f..e88b39648 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestSubscriber.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestSubscriber.java @@ -19,7 +19,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import io.netty.buffer.ByteBuf; import io.rsocket.Payload; import org.mockito.Mockito; import org.reactivestreams.Subscriber; diff --git a/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java b/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java index 87e569fde..988bd523d 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java @@ -16,11 +16,11 @@ package io.rsocket.util; +import static org.assertj.core.api.Assertions.*; + import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import static org.assertj.core.api.Assertions.*; - final class NumberUtilsTest { @DisplayName("returns int value with postitive int") diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java index accd34400..7bea75318 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java @@ -23,16 +23,6 @@ import io.rsocket.stat.Median; import io.rsocket.stat.Quantile; import io.rsocket.util.Clock; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; - import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayList; @@ -43,6 +33,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; /** * An implementation of {@link Mono} that load balances across a pool of RSockets and emits one when diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/RSocketSupplierPool.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/RSocketSupplierPool.java index 6f1fba30a..35615d2a2 100644 --- a/rsocket-load-balancer/src/main/java/io/rsocket/client/RSocketSupplierPool.java +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/RSocketSupplierPool.java @@ -2,6 +2,11 @@ import io.rsocket.Closeable; import io.rsocket.client.filter.RSocketSupplier; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -10,12 +15,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Consumer; -import java.util.function.Supplier; - public class RSocketSupplierPool implements Supplier>, Consumer, Closeable { private static final Logger logger = LoggerFactory.getLogger(RSocketSupplierPool.class); @@ -178,11 +177,11 @@ private void close(Collection suppliers) { } } } - + public synchronized int poolSize() { return factoryPool.size(); } - + public synchronized boolean isPoolEmpty() { return factoryPool.isEmpty(); } diff --git a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java index 8a1f1bf6b..20d58dcb7 100644 --- a/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java +++ b/rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java @@ -16,23 +16,22 @@ package io.rsocket.micrometer; +import static io.rsocket.frame.FrameType.*; + import io.micrometer.core.instrument.*; import io.netty.buffer.ByteBuf; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameHeaderFlyweight; import io.rsocket.frame.FrameType; import io.rsocket.plugins.DuplexConnectionInterceptor.Type; +import java.util.Objects; +import java.util.function.Consumer; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Objects; -import java.util.function.Consumer; - -import static io.rsocket.frame.FrameType.*; - /** * An implementation of {@link DuplexConnection} that intercepts frames and gathers Micrometer * metrics about them. diff --git a/rsocket-micrometer/src/test/java/io/rsocket/micrometer/MicrometerDuplexConnectionTest.java b/rsocket-micrometer/src/test/java/io/rsocket/micrometer/MicrometerDuplexConnectionTest.java index 880b5df5f..03abd2084 100644 --- a/rsocket-micrometer/src/test/java/io/rsocket/micrometer/MicrometerDuplexConnectionTest.java +++ b/rsocket-micrometer/src/test/java/io/rsocket/micrometer/MicrometerDuplexConnectionTest.java @@ -14,9 +14,16 @@ * limitations under the License. */ - package io.rsocket.micrometer; +import static io.rsocket.frame.FrameType.*; +import static io.rsocket.plugins.DuplexConnectionInterceptor.Type.CLIENT; +import static io.rsocket.plugins.DuplexConnectionInterceptor.Type.SERVER; +import static io.rsocket.test.TestFrames.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import static org.mockito.Mockito.*; + import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -33,14 +40,6 @@ import reactor.core.publisher.Operators; import reactor.test.StepVerifier; -import static io.rsocket.frame.FrameType.*; -import static io.rsocket.plugins.DuplexConnectionInterceptor.Type.CLIENT; -import static io.rsocket.plugins.DuplexConnectionInterceptor.Type.SERVER; -import static io.rsocket.test.TestFrames.*; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNullPointerException; -import static org.mockito.Mockito.*; - final class MicrometerDuplexConnectionTest { private final DuplexConnection delegate = mock(DuplexConnection.class, RETURNS_SMART_NULLS); diff --git a/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java b/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java index 8617408e8..ec143b7ab 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java +++ b/rsocket-test/src/main/java/io/rsocket/test/ClientSetupRule.java @@ -48,9 +48,7 @@ public ClientSetupRule( this.serverInit = address -> RSocketFactory.receive() - .acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket( - data, - metadata))) + .acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket(data, metadata))) .transport(serverTransportSupplier.apply(address)) .start() .block(); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java b/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java index b2326f543..18c23057a 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TestFrames.java @@ -30,96 +30,82 @@ public final class TestFrames { private TestFrames() {} - /** - * @return {@link ByteBuf} representing test instance of Cancel frame - */ + /** @return {@link ByteBuf} representing test instance of Cancel frame */ public static ByteBuf createTestCancelFrame() { return CancelFrameFlyweight.encode(allocator, 1); } - /** - * @return {@link ByteBuf} representing test instance of Error frame - */ + /** @return {@link ByteBuf} representing test instance of Error frame */ public static ByteBuf createTestErrorFrame() { return ErrorFrameFlyweight.encode(allocator, 1, new RuntimeException()); } - /** - * @return {@link ByteBuf} representing test instance of Extension frame - */ + /** @return {@link ByteBuf} representing test instance of Extension frame */ public static ByteBuf createTestExtensionFrame() { - return ExtensionFrameFlyweight.encode(allocator, 1, 1, - Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER); + return ExtensionFrameFlyweight.encode( + allocator, 1, 1, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER); } - /** - * @return {@link ByteBuf} representing test instance of Keep-Alive frame - */ + /** @return {@link ByteBuf} representing test instance of Keep-Alive frame */ public static ByteBuf createTestKeepaliveFrame() { return KeepAliveFrameFlyweight.encode(allocator, false, 1, Unpooled.EMPTY_BUFFER); } - /** - * @return {@link ByteBuf} representing test instance of Lease frame - */ + /** @return {@link ByteBuf} representing test instance of Lease frame */ public static ByteBuf createTestLeaseFrame() { return LeaseFlyweight.encode(allocator, 1, 1, null); } - /** - * @return {@link ByteBuf} representing test instance of Metadata-Push frame - */ + /** @return {@link ByteBuf} representing test instance of Metadata-Push frame */ public static ByteBuf createTestMetadataPushFrame() { return MetadataPushFrameFlyweight.encode(allocator, Unpooled.EMPTY_BUFFER); } - /** - * @return {@link ByteBuf} representing test instance of Payload frame - */ + /** @return {@link ByteBuf} representing test instance of Payload frame */ public static ByteBuf createTestPayloadFrame() { - return PayloadFrameFlyweight.encode(allocator,1, false,true, false, null, Unpooled.EMPTY_BUFFER); + return PayloadFrameFlyweight.encode( + allocator, 1, false, true, false, null, Unpooled.EMPTY_BUFFER); } - /** - * @return {@link ByteBuf} representing test instance of Request-Channel frame - */ + /** @return {@link ByteBuf} representing test instance of Request-Channel frame */ public static ByteBuf createTestRequestChannelFrame() { - return RequestChannelFrameFlyweight.encode(allocator, 1, false, false, 1, null, Unpooled.EMPTY_BUFFER); + return RequestChannelFrameFlyweight.encode( + allocator, 1, false, false, 1, null, Unpooled.EMPTY_BUFFER); } - /** - * @return {@link ByteBuf} representing test instance of Fire-and-Forget frame - */ + /** @return {@link ByteBuf} representing test instance of Fire-and-Forget frame */ public static ByteBuf createTestRequestFireAndForgetFrame() { - return RequestFireAndForgetFrameFlyweight.encode(allocator, 1, false, null, Unpooled.EMPTY_BUFFER); + return RequestFireAndForgetFrameFlyweight.encode( + allocator, 1, false, null, Unpooled.EMPTY_BUFFER); } - /** - * @return {@link ByteBuf} representing test instance of Request-N frame - */ + /** @return {@link ByteBuf} representing test instance of Request-N frame */ public static ByteBuf createTestRequestNFrame() { - return RequestNFrameFlyweight.encode(allocator,1,1); + return RequestNFrameFlyweight.encode(allocator, 1, 1); } - /** - * @return {@link ByteBuf} representing test instance of Request-Response frame - */ + /** @return {@link ByteBuf} representing test instance of Request-Response frame */ public static ByteBuf createTestRequestResponseFrame() { return RequestResponseFrameFlyweight.encode(allocator, 1, false, emptyPayload); } - /** - * @return {@link ByteBuf} representing test instance of Request-Stream frame - */ + /** @return {@link ByteBuf} representing test instance of Request-Stream frame */ public static ByteBuf createTestRequestStreamFrame() { - return RequestStreamFrameFlyweight.encode(allocator, 1,false, 1L, emptyPayload); + return RequestStreamFrameFlyweight.encode(allocator, 1, false, 1L, emptyPayload); } - /** - * @return {@link ByteBuf} representing test instance of Setup frame - */ + /** @return {@link ByteBuf} representing test instance of Setup frame */ public static ByteBuf createTestSetupFrame() { return SetupFrameFlyweight.encode( - allocator, false, false, 1, 1, Unpooled.EMPTY_BUFFER, "metadataType", "dataType", null, Unpooled.EMPTY_BUFFER); + allocator, + false, + false, + 1, + 1, + Unpooled.EMPTY_BUFFER, + "metadataType", + "dataType", + null, + Unpooled.EMPTY_BUFFER); } } diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index f6293a204..fc6301d7d 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -23,6 +23,10 @@ import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -32,11 +36,6 @@ import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; -import java.time.Duration; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; - public interface TransportTest { @AfterEach @@ -167,17 +166,18 @@ default void requestChannel3() { .expectComplete() .verify(getTimeout()); } - + @DisplayName("makes 1 requestChannel request with 512 payloads") @Test default void requestChannel512() { Flux payloads = Flux.range(0, 512).map(this::createTestPayload); - + Flux.range(0, 1024) - .flatMap(v -> Mono.fromRunnable(()-> check(payloads)).subscribeOn(Schedulers.elastic()), 12) + .flatMap( + v -> Mono.fromRunnable(() -> check(payloads)).subscribeOn(Schedulers.elastic()), 12) .blockLast(); } - + default void check(Flux payloads) { getClient() .requestChannel(payloads) @@ -204,8 +204,8 @@ default void requestResponse1() { @Test default void requestResponse10() { Flux.range(1, 10) - .flatMap(i -> getClient().requestResponse(createTestPayload(i)) - .doOnNext(v -> assertPayload(v))) + .flatMap( + i -> getClient().requestResponse(createTestPayload(i)).doOnNext(v -> assertPayload(v))) .as(StepVerifier::create) .expectNextCount(10) .expectComplete() @@ -277,16 +277,13 @@ default void requestStreamDelayedRequestN() { default void assertPayload(Payload p) { TransportPair transportPair = getTransportPair(); if (!transportPair.expectedPayloadData().equals(p.getDataUtf8()) - || - !transportPair.expectedPayloadMetadata().equals(p.getMetadataUtf8())) { + || !transportPair.expectedPayloadMetadata().equals(p.getMetadataUtf8())) { throw new IllegalStateException("Unexpected payload"); } } default void assertChannelPayload(Payload p) { - if (!"test-data".equals(p.getDataUtf8()) - || - !"metadata".equals(p.getMetadataUtf8())) { + if (!"test-data".equals(p.getDataUtf8()) || !"metadata".equals(p.getMetadataUtf8())) { throw new IllegalStateException("Unexpected payload"); } } @@ -308,9 +305,7 @@ public TransportPair( server = RSocketFactory.receive() - .acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket( - data, - metadata))) + .acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket(data, metadata))) .transport(serverTransportSupplier.apply(address)) .start() .block(); diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index 5474a2917..91e8c3e57 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -21,12 +21,11 @@ import io.rsocket.transport.ClientTransport; import io.rsocket.transport.ServerTransport; import io.rsocket.transport.local.LocalServerTransport.ServerDuplexConnectionAcceptor; +import java.util.Objects; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.UnicastProcessor; -import java.util.Objects; - /** * An implementation of {@link ClientTransport} that connects to a {@link ServerTransport} in the * same JVM. diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index 0350085c1..a295d2b00 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -18,16 +18,13 @@ import io.netty.buffer.ByteBuf; import io.rsocket.DuplexConnection; -import io.rsocket.frame.FrameHeaderFlyweight; -import io.rsocket.frame.FrameType; +import java.util.Objects; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import java.util.Objects; - /** An implementation of {@link DuplexConnection} that connects inside the same JVM. */ final class LocalDuplexConnection implements DuplexConnection { diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalPingPong.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalPingPong.java index fc1f5a9cb..58a287948 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalPingPong.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalPingPong.java @@ -21,11 +21,10 @@ import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.test.PingClient; import io.rsocket.test.PingHandler; +import java.time.Duration; import org.HdrHistogram.Recorder; import reactor.core.publisher.Mono; -import java.time.Duration; - public final class LocalPingPong { public static void main(String... args) { diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java index cf5d63fed..7184dd645 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java @@ -16,37 +16,31 @@ package io.rsocket.transport.local; -import io.rsocket.test.TransportTest; -import org.junit.jupiter.api.Test; +final class LocalTransportTest { // implements TransportTest { + /* + TODO // think this has a memory leak or something in the local connection now that needs to be checked into. the test + TODO // isn't very happy when run from commandline i the command line + private static final AtomicInteger UNIQUE_NAME_GENERATOR = new AtomicInteger(); -import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; + private final TransportPair transportPair = + new TransportPair<>( + () -> "test" + UNIQUE_NAME_GENERATOR.incrementAndGet(), + (address, server) -> LocalClientTransport.create(address), + LocalServerTransport::create); -final class LocalTransportTest {// implements TransportTest { -/* -TODO // think this has a memory leak or something in the local connection now that needs to be checked into. the test -TODO // isn't very happy when run from commandline i the command line - private static final AtomicInteger UNIQUE_NAME_GENERATOR = new AtomicInteger(); + @Override + @Test + public void requestChannel512() { + + } - private final TransportPair transportPair = - new TransportPair<>( - () -> "test" + UNIQUE_NAME_GENERATOR.incrementAndGet(), - (address, server) -> LocalClientTransport.create(address), - LocalServerTransport::create); - - @Override - @Test - public void requestChannel512() { - - } - - @Override - public Duration getTimeout() { - return Duration.ofSeconds(10); - } + @Override + public Duration getTimeout() { + return Duration.ofSeconds(10); + } - @Override - public TransportPair getTransportPair() { - return transportPair; - }*/ + @Override + public TransportPair getTransportPair() { + return transportPair; + }*/ } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java index 53bea6936..b84201ac9 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/SendPublisher.java @@ -6,6 +6,11 @@ import io.netty.channel.EventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -15,12 +20,6 @@ import reactor.core.publisher.Operators; import reactor.util.concurrent.Queues; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Function; - class SendPublisher extends Flux { private static final AtomicIntegerFieldUpdater WIP = @@ -35,6 +34,7 @@ class SendPublisher extends Flux { private final Publisher source; private final Channel channel; private final EventLoop eventLoop; + private final Queue queue; private final AtomicBoolean completed = new AtomicBoolean(); private final Function transformer; @@ -59,7 +59,10 @@ class SendPublisher extends Flux { @SuppressWarnings("unchecked") SendPublisher( - Publisher source, Channel channel, Function transformer, SizeOf sizeOf) { + Publisher source, + Channel channel, + Function transformer, + SizeOf sizeOf) { this(Queues.small().get(), source, channel, transformer, sizeOf); } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index f51f3ed54..57e3ff0a9 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import io.rsocket.frame.FrameLengthFlyweight; +import java.util.Objects; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.Fuseable; @@ -28,8 +29,6 @@ import reactor.netty.Connection; import reactor.netty.FutureMono; -import java.util.Objects; - /** An implementation of {@link DuplexConnection} that connects via TCP. */ public final class TcpDuplexConnection implements DuplexConnection { @@ -78,11 +77,15 @@ public Mono onClose() { @Override public Flux receive() { - return connection.inbound().receive().map(byteBuf -> { - ByteBuf frame = FrameLengthFlyweight.frame(byteBuf); - frame.retain(); - return frame; - }); + return connection + .inbound() + .receive() + .map( + byteBuf -> { + ByteBuf frame = FrameLengthFlyweight.frame(byteBuf); + frame.retain(); + return frame; + }); } @Override @@ -98,13 +101,17 @@ public Mono send(Publisher frames) { queueSubscription, frameFlux, connection.channel(), - frame -> FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame).retain(), + frame -> + FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame) + .retain(), ByteBuf::readableBytes); } else { return new SendPublisher<>( frameFlux, connection.channel(), - frame -> FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame).retain(), + frame -> + FrameLengthFlyweight.encode(allocator, frame.readableBytes(), frame) + .retain(), ByteBuf::readableBytes); } }) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 210c9fd2e..aa94aa0bb 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -18,6 +18,7 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.rsocket.DuplexConnection; +import java.util.Objects; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.Fuseable; @@ -27,8 +28,6 @@ import reactor.netty.FutureMono; import reactor.util.concurrent.Queues; -import java.util.Objects; - /** * An implementation of {@link DuplexConnection} that connects via a Websocket. * diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/SetupRejectionTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/SetupRejectionTest.java index 062902cf4..f32d28a0b 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/SetupRejectionTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/SetupRejectionTest.java @@ -13,17 +13,16 @@ import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; -import org.junit.jupiter.params.provider.Arguments; -import reactor.core.publisher.EmitterProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - import java.net.InetSocketAddress; import java.time.Duration; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; +import org.junit.jupiter.params.provider.Arguments; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; public class SetupRejectionTest { diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPing.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPing.java index b717afc0b..719c8e2cf 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPing.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPing.java @@ -21,11 +21,10 @@ import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.test.PingClient; import io.rsocket.transport.netty.client.TcpClientTransport; +import java.time.Duration; import org.HdrHistogram.Recorder; import reactor.core.publisher.Mono; -import java.time.Duration; - public final class TcpPing { public static void main(String... args) {