From cec7a789c7a644c20b1c48815e5446c8099a8d13 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 8 May 2020 20:00:56 +0300 Subject: [PATCH] provides extra @NonNullApi annotation for all packages (#826) --- build.gradle | 1 - rsocket-core/build.gradle | 2 -- .../io/rsocket/ConnectionSetupPayload.java | 2 +- .../io/rsocket/core/RSocketConnector.java | 2 +- .../io/rsocket/core/RSocketRequester.java | 2 +- .../io/rsocket/core/RSocketResponder.java | 2 +- .../io/rsocket/core/StreamIdSupplier.java | 1 + .../exceptions/ApplicationErrorException.java | 2 +- .../rsocket/exceptions/CanceledException.java | 2 +- .../exceptions/ConnectionCloseException.java | 2 +- .../exceptions/ConnectionErrorException.java | 2 +- .../exceptions/CustomRSocketException.java | 2 +- .../rsocket/exceptions/InvalidException.java | 2 +- .../exceptions/InvalidSetupException.java | 2 +- .../rsocket/exceptions/RejectedException.java | 2 +- .../exceptions/RejectedResumeException.java | 2 +- .../exceptions/RejectedSetupException.java | 2 +- .../io/rsocket/exceptions/SetupException.java | 2 +- .../exceptions/UnsupportedSetupException.java | 2 +- .../FragmentationDuplexConnection.java | 2 +- .../fragmentation/FrameReassembler.java | 8 +++++-- .../io/rsocket/frame/ExtensionFrameCodec.java | 3 ++- .../java/io/rsocket/frame/FrameBodyCodec.java | 5 ++-- .../io/rsocket/frame/GenericFrameCodec.java | 6 +++-- .../io/rsocket/frame/LeaseFrameCodec.java | 6 ++--- .../rsocket/frame/MetadataPushFrameCodec.java | 4 ++++ .../io/rsocket/frame/PayloadFrameCodec.java | 6 +++-- .../frame/RequestChannelFrameCodec.java | 4 +++- .../frame/RequestFireAndForgetFrameCodec.java | 4 +++- .../frame/RequestResponseFrameCodec.java | 4 +++- .../frame/RequestStreamFrameCodec.java | 4 +++- .../io/rsocket/frame/SetupFrameCodec.java | 6 +++-- .../rsocket/frame/decoder/package-info.java | 24 +++++++++++++++++++ .../java/io/rsocket/frame/package-info.java | 3 +++ .../ClientServerInputMultiplexer.java | 1 + .../io/rsocket/internal/package-info.java | 4 +++- .../io/rsocket/keepalive/package-info.java | 4 +++- .../src/main/java/io/rsocket/lease/Lease.java | 4 +--- .../main/java/io/rsocket/lease/LeaseImpl.java | 4 +--- .../rsocket/lease/MissingLeaseException.java | 7 +++--- .../rsocket/lease/ResponderLeaseHandler.java | 2 +- .../java/io/rsocket/lease/package-info.java | 4 +++- .../io/rsocket/metadata/package-info.java | 3 +++ .../main/java/io/rsocket/package-info.java | 3 +++ .../java/io/rsocket/plugins/package-info.java | 3 +++ .../io/rsocket/resume/SessionManager.java | 2 +- .../java/io/rsocket/resume/package-info.java | 4 +++- .../io/rsocket/transport/package-info.java | 4 +++- .../java/io/rsocket/util/ByteBufPayload.java | 2 +- .../java/io/rsocket/util/DefaultPayload.java | 2 +- .../java/io/rsocket/util/package-info.java | 4 +++- .../core/ConnectionSetupPayloadTest.java | 2 +- .../core/RSocketRequesterSubscribersTest.java | 2 +- .../io/rsocket/frame/LeaseFrameCodecTest.java | 2 +- .../io/rsocket/frame/SetupFrameCodecTest.java | 2 +- .../rsocket/client/filter/package-info.java | 20 ++++++++++++++++ .../java/io/rsocket/client/package-info.java | 20 ++++++++++++++++ .../java/io/rsocket/stat/package-info.java | 20 ++++++++++++++++ rsocket-micrometer/build.gradle | 2 -- rsocket-test/build.gradle | 2 -- rsocket-transport-local/build.gradle | 2 -- rsocket-transport-netty/build.gradle | 2 -- .../netty/client/TcpClientTransport.java | 2 +- .../client/WebsocketClientTransport.java | 8 ++++--- .../netty/server/CloseableChannel.java | 2 +- .../netty/TcpSecureTransportTest.java | 4 ++-- .../netty/WebsocketSecureTransportTest.java | 4 ++-- 67 files changed, 199 insertions(+), 79 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/frame/decoder/package-info.java create mode 100644 rsocket-load-balancer/src/main/java/io/rsocket/client/filter/package-info.java create mode 100644 rsocket-load-balancer/src/main/java/io/rsocket/client/package-info.java create mode 100644 rsocket-load-balancer/src/main/java/io/rsocket/stat/package-info.java diff --git a/build.gradle b/build.gradle index 2c7757e0f..f579b3ae0 100644 --- a/build.gradle +++ b/build.gradle @@ -62,7 +62,6 @@ subprojects { dependencies { dependency "ch.qos.logback:logback-classic:${ext['logback.version']}" - dependency "com.google.code.findbugs:jsr305:${ext['findbugs.version']}" dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}" dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}" dependency "org.assertj:assertj-core:${ext['assertj.version']}" diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle index ca2ae0e65..41adbd7a8 100644 --- a/rsocket-core/build.gradle +++ b/rsocket-core/build.gradle @@ -29,8 +29,6 @@ dependencies { implementation 'org.slf4j:slf4j-api' - compileOnly 'com.google.code.findbugs:jsr305' - testImplementation 'io.projectreactor:reactor-test' testImplementation 'org.assertj:assertj-core' testImplementation 'org.junit.jupiter:junit-jupiter-api' diff --git a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java index bd4582e2b..ece2aa9fa 100644 --- a/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/ConnectionSetupPayload.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.util.AbstractReferenceCounted; import io.rsocket.core.DefaultConnectionSetupPayload; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * Exposes information from the {@code SETUP} frame to a server, as well as to client responders. diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index b69610f3f..38393c27d 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -41,10 +41,10 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nullable; import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.Nullable; import reactor.util.retry.Retry; /** diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 846eaa922..068797d39 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; import java.util.function.Supplier; -import javax.annotation.Nullable; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -68,6 +67,7 @@ import reactor.core.publisher.SignalType; import reactor.core.publisher.UnicastProcessor; import reactor.core.scheduler.Scheduler; +import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; /** diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index b9d3ea794..dce182b49 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -39,7 +39,6 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.Supplier; -import javax.annotation.Nullable; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -47,6 +46,7 @@ import reactor.core.Disposable; import reactor.core.Exceptions; import reactor.core.publisher.*; +import reactor.util.annotation.Nullable; import reactor.util.concurrent.Queues; /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ diff --git a/rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java b/rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java index 7f4d7b7b3..15d39c993 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java +++ b/rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java @@ -17,6 +17,7 @@ import io.netty.util.collection.IntObjectMap; +/** This API is not thread-safe and must be strictly used in serialized fashion */ final class StreamIdSupplier { private static final int MASK = 0x7FFFFFFF; diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/ApplicationErrorException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/ApplicationErrorException.java index e617b82d8..cd0d46754 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/ApplicationErrorException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/ApplicationErrorException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * Application layer logic generating a Reactive Streams {@code onError} event. diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/CanceledException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/CanceledException.java index 3c5fc7420..d51ba0fb7 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/CanceledException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/CanceledException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The Responder canceled the request but may have started processing it (similar to REJECTED but diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionCloseException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionCloseException.java index 5cff2c821..80324aa90 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionCloseException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionCloseException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The connection is being terminated. Sender or Receiver of this frame MUST wait for outstanding diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionErrorException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionErrorException.java index 3fcb8f5de..b44714f7e 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionErrorException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/ConnectionErrorException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The connection is being terminated. Sender or Receiver of this frame MAY close the connection diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/CustomRSocketException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/CustomRSocketException.java index 18f488ba0..079b561f9 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/CustomRSocketException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/CustomRSocketException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public class CustomRSocketException extends RSocketException { private static final long serialVersionUID = 7873267740343446585L; diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidException.java index 2428d1e7e..a1b77b8dd 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The request is invalid. diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidSetupException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidSetupException.java index 57da19bb6..b0889c5a6 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidSetupException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/InvalidSetupException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The Setup frame is invalid for the server (it could be that the client is too recent for the old diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedException.java index c87a60243..baed84e1b 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * Despite being a valid request, the Responder decided to reject it. The Responder guarantees that diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedResumeException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedResumeException.java index 8a6ea2244..8a99fcffb 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedResumeException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedResumeException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The server rejected the resume, it can specify the reason in the payload. diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedSetupException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedSetupException.java index 972b430a7..c09a27e32 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedSetupException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/RejectedSetupException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * The server rejected the setup, it can specify the reason in the payload. diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/SetupException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/SetupException.java index 158e5410d..ed979c9e6 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/SetupException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/SetupException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** The root of the setup exception hierarchy. */ public abstract class SetupException extends RSocketException { diff --git a/rsocket-core/src/main/java/io/rsocket/exceptions/UnsupportedSetupException.java b/rsocket-core/src/main/java/io/rsocket/exceptions/UnsupportedSetupException.java index 3282c9750..7429ccd98 100644 --- a/rsocket-core/src/main/java/io/rsocket/exceptions/UnsupportedSetupException.java +++ b/rsocket-core/src/main/java/io/rsocket/exceptions/UnsupportedSetupException.java @@ -17,7 +17,7 @@ package io.rsocket.exceptions; import io.rsocket.frame.ErrorFrameCodec; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * Some (or all) of the parameters specified by the client are unsupported by the server. diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 5192ffead..5d89bb9ad 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -25,12 +25,12 @@ import io.rsocket.frame.FrameLengthCodec; import io.rsocket.frame.FrameType; import java.util.Objects; -import javax.annotation.Nullable; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.annotation.Nullable; /** * A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s. diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java index 1e96bd1fc..52068e5de 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.SynchronousSink; +import reactor.util.annotation.Nullable; /** * The implementation of the RSocket reassembly behavior. @@ -83,6 +84,7 @@ public boolean isDisposed() { return get(); } + @Nullable synchronized ByteBuf getHeader(int streamId) { return headers.get(streamId); } @@ -109,14 +111,17 @@ synchronized CompositeByteBuf getData(int streamId) { return byteBuf; } + @Nullable synchronized ByteBuf removeHeader(int streamId) { return headers.remove(streamId); } + @Nullable synchronized CompositeByteBuf removeMetadata(int streamId) { return metadata.remove(streamId); } + @Nullable synchronized CompositeByteBuf removeData(int streamId) { return data.remove(streamId); } @@ -236,7 +241,6 @@ void reassembleFrame(ByteBuf frame, SynchronousSink sink) { case CANCEL: case ERROR: cancelAssemble(streamId); - default: } if (!frameType.isFragmentable()) { @@ -270,7 +274,7 @@ private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf h metadata = PayloadFrameCodec.metadata(frame).retain(); } } else { - metadata = cm != null ? cm : null; + metadata = cm; } ByteBuf data = assembleData(frame, streamId); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameCodec.java index bf30b9556..418926596 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/ExtensionFrameCodec.java @@ -2,7 +2,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public class ExtensionFrameCodec { private ExtensionFrameCodec() {} @@ -49,6 +49,7 @@ public static ByteBuf data(ByteBuf byteBuf) { return data; } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { FrameHeaderCodec.ensureFrameType(FrameType.EXT, byteBuf); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/FrameBodyCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/FrameBodyCodec.java index 3256d4426..ea011e503 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/FrameBodyCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/FrameBodyCodec.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; +import reactor.util.annotation.Nullable; class FrameBodyCodec { public static final int FRAME_LENGTH_MASK = 0xFFFFFF; @@ -33,9 +34,9 @@ private static int decodeLength(final ByteBuf byteBuf) { static ByteBuf encode( ByteBufAllocator allocator, final ByteBuf header, - ByteBuf metadata, + @Nullable ByteBuf metadata, boolean hasMetadata, - ByteBuf data) { + @Nullable ByteBuf data) { final boolean addData; if (data != null) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/GenericFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/GenericFrameCodec.java index 65e7eeeea..56a93d869 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/GenericFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/GenericFrameCodec.java @@ -4,7 +4,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.IllegalReferenceCountException; import io.rsocket.Payload; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; class GenericFrameCodec { @@ -75,7 +75,7 @@ static ByteBuf encode( boolean next, int requestN, @Nullable ByteBuf metadata, - ByteBuf data) { + @Nullable ByteBuf data) { final boolean hasMetadata = metadata != null; @@ -115,6 +115,7 @@ static ByteBuf data(ByteBuf byteBuf) { return data; } + @Nullable static ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderCodec.hasMetadata(byteBuf); if (!hasMetadata) { @@ -136,6 +137,7 @@ static ByteBuf dataWithRequestN(ByteBuf byteBuf) { return data; } + @Nullable static ByteBuf metadataWithRequestN(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderCodec.hasMetadata(byteBuf); if (!hasMetadata) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameCodec.java index cafd80104..f20c25d3b 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/LeaseFrameCodec.java @@ -2,8 +2,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public class LeaseFrameCodec { @@ -67,6 +66,7 @@ public static int numRequests(final ByteBuf byteBuf) { return numRequests; } + @Nullable public static ByteBuf metadata(final ByteBuf byteBuf) { FrameHeaderCodec.ensureFrameType(FrameType.LEASE, byteBuf); if (FrameHeaderCodec.hasMetadata(byteBuf)) { @@ -77,7 +77,7 @@ public static ByteBuf metadata(final ByteBuf byteBuf) { byteBuf.resetReaderIndex(); return metadata; } else { - return Unpooled.EMPTY_BUFFER; + return null; } } } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameCodec.java index 62c8a17dc..d8ffe3eef 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/MetadataPushFrameCodec.java @@ -8,6 +8,10 @@ public class MetadataPushFrameCodec { public static ByteBuf encodeReleasingPayload(ByteBufAllocator allocator, Payload payload) { + if (!payload.hasMetadata()) { + throw new IllegalStateException( + "Metadata push requires to have metadata present" + " in the given Payload"); + } final ByteBuf metadata = payload.metadata().retain(); // releasing payload safely since it can be already released wheres we have to release retained // data and metadata as well diff --git a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameCodec.java index 8a7e6427a..1ae9c6750 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/PayloadFrameCodec.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; +import reactor.util.annotation.Nullable; public class PayloadFrameCodec { @@ -37,8 +38,8 @@ public static ByteBuf encode( boolean fragmentFollows, boolean complete, boolean next, - ByteBuf metadata, - ByteBuf data) { + @Nullable ByteBuf metadata, + @Nullable ByteBuf data) { return GenericFrameCodec.encode( allocator, FrameType.PAYLOAD, streamId, fragmentFollows, complete, next, 0, metadata, data); @@ -48,6 +49,7 @@ public static ByteBuf data(ByteBuf byteBuf) { return GenericFrameCodec.data(byteBuf); } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { return GenericFrameCodec.metadata(byteBuf); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameCodec.java index 2ff887043..60906083d 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestChannelFrameCodec.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; +import reactor.util.annotation.Nullable; public class RequestChannelFrameCodec { @@ -31,7 +32,7 @@ public static ByteBuf encode( boolean fragmentFollows, boolean complete, long initialRequestN, - ByteBuf metadata, + @Nullable ByteBuf metadata, ByteBuf data) { if (initialRequestN < 1) { @@ -56,6 +57,7 @@ public static ByteBuf data(ByteBuf byteBuf) { return GenericFrameCodec.dataWithRequestN(byteBuf); } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { return GenericFrameCodec.metadataWithRequestN(byteBuf); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameCodec.java index ddb5bc5d7..b91199179 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestFireAndForgetFrameCodec.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; +import reactor.util.annotation.Nullable; public class RequestFireAndForgetFrameCodec { @@ -19,7 +20,7 @@ public static ByteBuf encode( ByteBufAllocator allocator, int streamId, boolean fragmentFollows, - ByteBuf metadata, + @Nullable ByteBuf metadata, ByteBuf data) { return GenericFrameCodec.encode( @@ -30,6 +31,7 @@ public static ByteBuf data(ByteBuf byteBuf) { return GenericFrameCodec.data(byteBuf); } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { return GenericFrameCodec.metadata(byteBuf); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameCodec.java index 884a8293b..4a37acfd5 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestResponseFrameCodec.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; +import reactor.util.annotation.Nullable; public class RequestResponseFrameCodec { @@ -19,7 +20,7 @@ public static ByteBuf encode( ByteBufAllocator allocator, int streamId, boolean fragmentFollows, - ByteBuf metadata, + @Nullable ByteBuf metadata, ByteBuf data) { return GenericFrameCodec.encode( allocator, FrameType.REQUEST_RESPONSE, streamId, fragmentFollows, metadata, data); @@ -29,6 +30,7 @@ public static ByteBuf data(ByteBuf byteBuf) { return GenericFrameCodec.data(byteBuf); } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { return GenericFrameCodec.metadata(byteBuf); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameCodec.java index 9853a8b54..2f5dbf0d8 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/RequestStreamFrameCodec.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; +import reactor.util.annotation.Nullable; public class RequestStreamFrameCodec { @@ -26,7 +27,7 @@ public static ByteBuf encode( int streamId, boolean fragmentFollows, long initialRequestN, - ByteBuf metadata, + @Nullable ByteBuf metadata, ByteBuf data) { if (initialRequestN < 1) { @@ -51,6 +52,7 @@ public static ByteBuf data(ByteBuf byteBuf) { return GenericFrameCodec.dataWithRequestN(byteBuf); } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { return GenericFrameCodec.metadataWithRequestN(byteBuf); } diff --git a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameCodec.java b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameCodec.java index d6f7431e4..547e2436e 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameCodec.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/SetupFrameCodec.java @@ -6,6 +6,7 @@ import io.netty.buffer.Unpooled; import io.rsocket.Payload; import java.nio.charset.StandardCharsets; +import reactor.util.annotation.Nullable; public class SetupFrameCodec { /** @@ -61,7 +62,7 @@ public static ByteBuf encode( int flags = 0; - if (resumeToken != null && resumeToken.readableBytes() > 0) { + if (resumeToken.readableBytes() > 0) { flags |= FLAGS_RESUME_ENABLE; } @@ -163,7 +164,7 @@ public static ByteBuf resumeToken(ByteBuf byteBuf) { byteBuf.resetReaderIndex(); return resumeToken; } else { - return null; + return Unpooled.EMPTY_BUFFER; } } @@ -186,6 +187,7 @@ public static String dataMimeType(ByteBuf byteBuf) { return mimeType; } + @Nullable public static ByteBuf metadata(ByteBuf byteBuf) { boolean hasMetadata = FrameHeaderCodec.hasMetadata(byteBuf); if (!hasMetadata) { diff --git a/rsocket-core/src/main/java/io/rsocket/frame/decoder/package-info.java b/rsocket-core/src/main/java/io/rsocket/frame/decoder/package-info.java new file mode 100644 index 000000000..82e8acaf3 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/frame/decoder/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Support for encoding and decoding of RSocket frames to and from {@link io.rsocket.Payload + * Payload}. + */ +@NonNullApi +package io.rsocket.frame.decoder; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/frame/package-info.java b/rsocket-core/src/main/java/io/rsocket/frame/package-info.java index 1d02ebca0..69f6d6860 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/package-info.java @@ -18,4 +18,7 @@ * Support for encoding and decoding of RSocket frames to and from {@link io.rsocket.Payload * Payload}. */ +@NonNullApi package io.rsocket.frame; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index c294d6539..0d24c51d8 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -161,6 +161,7 @@ private static class InternalDuplexConnection implements DuplexConnection { private final MonoProcessor>[] processors; private final boolean debugEnabled; + @SafeVarargs public InternalDuplexConnection( DuplexConnection source, MonoProcessor>... processors) { this.source = source; diff --git a/rsocket-core/src/main/java/io/rsocket/internal/package-info.java b/rsocket-core/src/main/java/io/rsocket/internal/package-info.java index 09918f3d1..07ddfab41 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/package-info.java @@ -18,5 +18,7 @@ * Internal package and must not be used outside this project. There are no guarantees for * API compatibility. */ -@javax.annotation.ParametersAreNonnullByDefault +@NonNullApi package io.rsocket.internal; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/package-info.java b/rsocket-core/src/main/java/io/rsocket/keepalive/package-info.java index ce8a2f3fb..d94a93cad 100644 --- a/rsocket-core/src/main/java/io/rsocket/keepalive/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/package-info.java @@ -15,5 +15,7 @@ */ /** Support classes for sending and keeping track of KEEPALIVE frames from the remote. */ -@javax.annotation.ParametersAreNonnullByDefault +@NonNullApi package io.rsocket.keepalive; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/lease/Lease.java b/rsocket-core/src/main/java/io/rsocket/lease/Lease.java index b9d99f88a..673b4a480 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/Lease.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/Lease.java @@ -19,8 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.rsocket.Availability; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** A contract for RSocket lease, which is sent by a request acceptor and is time bound. */ public interface Lease extends Availability { @@ -78,7 +77,6 @@ default int getRemainingTimeToLiveMillis(long now) { * * @return Metadata for the lease. */ - @Nonnull ByteBuf getMetadata(); /** diff --git a/rsocket-core/src/main/java/io/rsocket/lease/LeaseImpl.java b/rsocket-core/src/main/java/io/rsocket/lease/LeaseImpl.java index 63b0433cb..7abb8aab9 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/LeaseImpl.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/LeaseImpl.java @@ -19,8 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public class LeaseImpl implements Lease { private final int timeToLiveMillis; @@ -60,7 +59,6 @@ public int getStartingAllowedRequests() { return startingAllowedRequests; } - @Nonnull @Override public ByteBuf getMetadata() { return metadata; diff --git a/rsocket-core/src/main/java/io/rsocket/lease/MissingLeaseException.java b/rsocket-core/src/main/java/io/rsocket/lease/MissingLeaseException.java index 734d16d07..3b6cec62c 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/MissingLeaseException.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/MissingLeaseException.java @@ -17,17 +17,16 @@ import io.rsocket.exceptions.RejectedException; import java.util.Objects; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public class MissingLeaseException extends RejectedException { private static final long serialVersionUID = -6169748673403858959L; - public MissingLeaseException(@Nonnull Lease lease, @Nonnull String tag) { + public MissingLeaseException(Lease lease, String tag) { super(leaseMessage(Objects.requireNonNull(lease), Objects.requireNonNull(tag))); } - public MissingLeaseException(@Nonnull String tag) { + public MissingLeaseException(String tag) { super(leaseMessage(null, Objects.requireNonNull(tag))); } diff --git a/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java b/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java index 5f000cb30..2035ade87 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java @@ -23,10 +23,10 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; -import javax.annotation.Nullable; import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; +import reactor.util.annotation.Nullable; public interface ResponderLeaseHandler extends Availability { diff --git a/rsocket-core/src/main/java/io/rsocket/lease/package-info.java b/rsocket-core/src/main/java/io/rsocket/lease/package-info.java index ce1956628..342ab27f7 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/package-info.java @@ -21,5 +21,7 @@ * href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#resuming-operation">Resuming * Operation */ -@javax.annotation.ParametersAreNonnullByDefault +@NonNullApi package io.rsocket.lease; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/metadata/package-info.java b/rsocket-core/src/main/java/io/rsocket/metadata/package-info.java index b1bc45ff0..3fb9ae1d6 100644 --- a/rsocket-core/src/main/java/io/rsocket/metadata/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/metadata/package-info.java @@ -19,4 +19,7 @@ * href="https://github.com/rsocket/rsocket/tree/master/Extensions">protocol extensions related * to the use of metadata. */ +@NonNullApi package io.rsocket.metadata; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/package-info.java b/rsocket-core/src/main/java/io/rsocket/package-info.java index 878a56301..6fe74fb38 100644 --- a/rsocket-core/src/main/java/io/rsocket/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/package-info.java @@ -23,4 +23,7 @@ *

To connect to or start a server see {@link io.rsocket.core.RSocketConnector RSocketConnector} * and {@link io.rsocket.core.RSocketServer RSocketServer} in {@link io.rsocket.core}. */ +@NonNullApi package io.rsocket; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/plugins/package-info.java b/rsocket-core/src/main/java/io/rsocket/plugins/package-info.java index 743e3a8a4..fd9e1f01a 100644 --- a/rsocket-core/src/main/java/io/rsocket/plugins/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/plugins/package-info.java @@ -15,4 +15,7 @@ */ /** Contracts for interception of transports, connections, and requests in in RSocket Java. */ +@NonNullApi package io.rsocket.plugins; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/resume/SessionManager.java b/rsocket-core/src/main/java/io/rsocket/resume/SessionManager.java index 3882103a0..1d5c23bd6 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/SessionManager.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/SessionManager.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public class SessionManager { private volatile boolean isDisposed; diff --git a/rsocket-core/src/main/java/io/rsocket/resume/package-info.java b/rsocket-core/src/main/java/io/rsocket/resume/package-info.java index aaaa3ee9f..98744386a 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/package-info.java @@ -21,5 +21,7 @@ * href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#resuming-operation">Resuming * Operation */ -@javax.annotation.ParametersAreNonnullByDefault +@NonNullApi package io.rsocket.resume; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/transport/package-info.java b/rsocket-core/src/main/java/io/rsocket/transport/package-info.java index 153676324..00536122a 100644 --- a/rsocket-core/src/main/java/io/rsocket/transport/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/transport/package-info.java @@ -15,5 +15,7 @@ */ /** Client and server transport contracts for pluggable transports. */ -@javax.annotation.ParametersAreNonnullByDefault +@NonNullApi package io.rsocket.transport; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java index f5d747f7f..4cf33fa86 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java @@ -28,7 +28,7 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; public final class ByteBufPayload extends AbstractReferenceCounted implements Payload { private static final Recycler RECYCLER = diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index ec73399f1..58f282110 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -23,7 +23,7 @@ import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import javax.annotation.Nullable; +import reactor.util.annotation.Nullable; /** * An implementation of {@link Payload}. This implementation is not thread-safe, and hence diff --git a/rsocket-core/src/main/java/io/rsocket/util/package-info.java b/rsocket-core/src/main/java/io/rsocket/util/package-info.java index e034672f1..2fac3327f 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/package-info.java +++ b/rsocket-core/src/main/java/io/rsocket/util/package-info.java @@ -15,5 +15,7 @@ */ /** Shared utility classes and {@link io.rsocket.Payload} implementations. */ -@javax.annotation.ParametersAreNonnullByDefault +@NonNullApi package io.rsocket.util; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java index 9d8b8354a..8eb5dee09 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ConnectionSetupPayloadTest.java @@ -82,7 +82,7 @@ private static ByteBuf encodeSetupFrame(boolean leaseEnabled, Payload setupPaylo leaseEnabled, KEEP_ALIVE_INTERVAL, KEEP_ALIVE_MAX_LIFETIME, - null, + Unpooled.EMPTY_BUFFER, METADATA_TYPE, DATA_TYPE, setupPayload); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java index 6f1ecf98b..00f74152a 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java @@ -143,6 +143,6 @@ static Stream>> allInteractions() { rSocket -> rSocket.requestResponse(DefaultPayload.create("test")), rSocket -> rSocket.requestStream(DefaultPayload.create("test")), // rSocket -> rSocket.requestChannel(Mono.just(DefaultPayload.create("test"))), - rSocket -> rSocket.metadataPush(DefaultPayload.create("test"))); + rSocket -> rSocket.metadataPush(DefaultPayload.create("", "test"))); } } diff --git a/rsocket-core/src/test/java/io/rsocket/frame/LeaseFrameCodecTest.java b/rsocket-core/src/test/java/io/rsocket/frame/LeaseFrameCodecTest.java index 448b5003b..73c3bde5e 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/LeaseFrameCodecTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/LeaseFrameCodecTest.java @@ -32,7 +32,7 @@ void leaseAbsentMetadata() { Assertions.assertFalse(FrameHeaderCodec.hasMetadata(lease)); Assertions.assertEquals(ttl, LeaseFrameCodec.ttl(lease)); Assertions.assertEquals(numRequests, LeaseFrameCodec.numRequests(lease)); - Assertions.assertEquals(0, LeaseFrameCodec.metadata(lease).readableBytes()); + Assertions.assertNull(LeaseFrameCodec.metadata(lease)); lease.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java b/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java index f7d649972..9607ad327 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java @@ -22,7 +22,7 @@ void testEncodingNoResume() { assertEquals(FrameType.SETUP, FrameHeaderCodec.frameType(frame)); assertFalse(SetupFrameCodec.resumeEnabled(frame)); - assertNull(SetupFrameCodec.resumeToken(frame)); + assertEquals(0, SetupFrameCodec.resumeToken(frame).readableBytes()); assertEquals("metadata_type", SetupFrameCodec.metadataMimeType(frame)); assertEquals("data_type", SetupFrameCodec.dataMimeType(frame)); assertEquals(metadata, SetupFrameCodec.metadata(frame)); diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/package-info.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/package-info.java new file mode 100644 index 000000000..55ce5646c --- /dev/null +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/filter/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@NonNullApi +package io.rsocket.client.filter; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/client/package-info.java b/rsocket-load-balancer/src/main/java/io/rsocket/client/package-info.java new file mode 100644 index 000000000..ec21dee96 --- /dev/null +++ b/rsocket-load-balancer/src/main/java/io/rsocket/client/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@NonNullApi +package io.rsocket.client; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-load-balancer/src/main/java/io/rsocket/stat/package-info.java b/rsocket-load-balancer/src/main/java/io/rsocket/stat/package-info.java new file mode 100644 index 000000000..cfb071175 --- /dev/null +++ b/rsocket-load-balancer/src/main/java/io/rsocket/stat/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@NonNullApi +package io.rsocket.stat; + +import reactor.util.annotation.NonNullApi; diff --git a/rsocket-micrometer/build.gradle b/rsocket-micrometer/build.gradle index 5f2aeb16f..4be616623 100644 --- a/rsocket-micrometer/build.gradle +++ b/rsocket-micrometer/build.gradle @@ -27,8 +27,6 @@ dependencies { implementation 'org.slf4j:slf4j-api' - compileOnly 'com.google.code.findbugs:jsr305' - testImplementation project(':rsocket-test') testImplementation 'io.projectreactor:reactor-test' testImplementation 'org.assertj:assertj-core' diff --git a/rsocket-test/build.gradle b/rsocket-test/build.gradle index 3009b5135..5ec1a8061 100644 --- a/rsocket-test/build.gradle +++ b/rsocket-test/build.gradle @@ -26,8 +26,6 @@ dependencies { api 'org.hdrhistogram:HdrHistogram' api 'org.junit.jupiter:junit-jupiter-api' - compileOnly 'com.google.code.findbugs:jsr305' - implementation 'io.projectreactor:reactor-test' implementation 'org.assertj:assertj-core' implementation 'org.mockito:mockito-core' diff --git a/rsocket-transport-local/build.gradle b/rsocket-transport-local/build.gradle index 8c3226065..a5ba84d5c 100644 --- a/rsocket-transport-local/build.gradle +++ b/rsocket-transport-local/build.gradle @@ -24,8 +24,6 @@ plugins { dependencies { api project(':rsocket-core') - compileOnly 'com.google.code.findbugs:jsr305' - testImplementation project(':rsocket-test') testImplementation 'io.projectreactor:reactor-test' testImplementation 'org.assertj:assertj-core' diff --git a/rsocket-transport-netty/build.gradle b/rsocket-transport-netty/build.gradle index 0aac12d5c..64e483c90 100644 --- a/rsocket-transport-netty/build.gradle +++ b/rsocket-transport-netty/build.gradle @@ -32,8 +32,6 @@ dependencies { api 'io.projectreactor.netty:reactor-netty' api 'org.slf4j:slf4j-api' - compileOnly 'com.google.code.findbugs:jsr305' - testImplementation project(':rsocket-test') testImplementation 'io.projectreactor:reactor-test' testImplementation 'org.assertj:assertj-core' diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java index 8be019f1c..22f139310 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java @@ -75,7 +75,7 @@ public static TcpClientTransport create(String bindAddress, int port) { public static TcpClientTransport create(InetSocketAddress address) { Objects.requireNonNull(address, "address must not be null"); - TcpClient tcpClient = TcpClient.create().addressSupplier(() -> address); + TcpClient tcpClient = TcpClient.create().remoteAddress(() -> address); return create(tcpClient); } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java index 6991728ca..747401210 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java @@ -35,6 +35,7 @@ import java.util.function.Supplier; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.WebsocketClientSpec; import reactor.netty.tcp.TcpClient; /** @@ -47,7 +48,7 @@ public final class WebsocketClientTransport implements ClientTransport, Transpor private final HttpClient client; - private String path; + private final String path; private Supplier> transportHeaders = Collections::emptyMap; @@ -92,7 +93,7 @@ public static WebsocketClientTransport create(String bindAddress, int port) { public static WebsocketClientTransport create(InetSocketAddress address) { Objects.requireNonNull(address, "address must not be null"); - TcpClient client = TcpClient.create().addressSupplier(() -> address); + TcpClient client = TcpClient.create().remoteAddress(() -> address); return create(client); } @@ -155,7 +156,8 @@ public Mono connect(int mtu) { ? isError : client .headers(headers -> transportHeaders.get().forEach(headers::set)) - .websocket(FRAME_LENGTH_MASK) + .websocket( + WebsocketClientSpec.builder().maxFramePayloadLength(FRAME_LENGTH_MASK).build()) .uri(path) .connect() .map( diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java index f6e83bc36..c0340c7a2 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java @@ -28,7 +28,7 @@ */ public final class CloseableChannel implements Closeable { - private DisposableChannel channel; + private final DisposableChannel channel; /** * Creates a new instance diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java index b77de6d4e..95bebd6aa 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpSecureTransportTest.java @@ -20,7 +20,7 @@ public class TcpSecureTransportTest implements TransportTest { (address, server) -> TcpClientTransport.create( TcpClient.create() - .addressSupplier(server::address) + .remoteAddress(server::address) .secure( ssl -> ssl.sslContext( @@ -31,7 +31,7 @@ public class TcpSecureTransportTest implements TransportTest { SelfSignedCertificate ssc = new SelfSignedCertificate(); TcpServer server = TcpServer.create() - .addressSupplier(() -> address) + .bindAddress(() -> address) .secure( ssl -> ssl.sslContext( diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java index c1d608979..ec33060b2 100644 --- a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketSecureTransportTest.java @@ -38,7 +38,7 @@ final class WebsocketSecureTransportTest implements TransportTest { (address, server) -> WebsocketClientTransport.create( HttpClient.create() - .addressSupplier(server::address) + .remoteAddress(server::address) .secure( ssl -> ssl.sslContext( @@ -53,7 +53,7 @@ final class WebsocketSecureTransportTest implements TransportTest { HttpServer server = HttpServer.from( TcpServer.create() - .addressSupplier(() -> address) + .bindAddress(() -> address) .secure( ssl -> ssl.sslContext(