From cd6a04bd83882b58dc83347309afeb720bdebb89 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Fri, 11 May 2018 22:14:51 +0300 Subject: [PATCH 1/2] * Lease support * Do not expose full Setup frame contents to server acceptor * Fix fragmentation of frames with null metadata --- build.gradle | 5 - .../main/kotlin/io/rsocket/kotlin/Frame.kt | 10 +- .../io/rsocket/kotlin/KeepAliveOptions.kt | 4 + .../main/kotlin/io/rsocket/kotlin/LeaseRef.kt | 18 ++ .../io/rsocket/kotlin/MediaTypeOptions.kt | 4 + .../io/rsocket/kotlin/RSocketFactory.kt | 161 ++++++++++++++---- .../main/kotlin/io/rsocket/kotlin/Setup.kt | 64 +------ .../exceptions/MissingLeaseException.kt | 18 ++ .../kotlin/exceptions/RejectedException.kt | 2 +- .../kotlin/internal/ConnectionDemuxer.kt | 24 +-- .../kotlin/internal/InterceptorRegistry.kt | 41 ++++- .../internal/ServerContractInterceptor.kt | 6 +- .../rsocket/kotlin/internal/ServiceHandler.kt | 3 +- .../rsocket/kotlin/internal/SetupContents.kt | 46 +++++ .../fragmentation/StreamFramesReassembler.kt | 12 +- .../internal/frame/SetupFrameFlyweight.kt | 3 + .../internal/lease/ConnectionLeaseRef.kt | 67 ++++++++ .../io/rsocket/kotlin/internal/lease/Lease.kt | 71 ++++++++ .../kotlin/internal/lease/LeaseContext.kt | 24 +++ .../lease/LeaseEnablingInterceptor.kt | 55 ++++++ .../kotlin/internal/lease/LeaseGranter.kt | 26 +++ .../internal/lease/LeaseGranterConnection.kt | 77 +++++++++ .../internal/lease/LeaseGranterInterceptor.kt | 45 +++++ .../kotlin/internal/lease/LeaseImpl.kt | 74 ++++++++ .../kotlin/internal/lease/LeaseInterceptor.kt | 33 ++++ .../kotlin/internal/lease/LeaseManager.kt | 67 ++++++++ .../kotlin/internal/lease/LeaseRSocket.kt | 95 +++++++++++ .../kotlin/internal/lease/LeaseSupport.kt | 91 ++++++++++ .../io/rsocket/kotlin/ServiceHandlerTest.kt | 9 - .../io/rsocket/kotlin/frame/SetupFrameTest.kt | 6 +- .../StreamFramesReassemblerTest.kt | 20 ++- .../lease/LeaseGranterConnectionTest.kt | 74 ++++++++ .../kotlin/internal/lease/LeaseManagerTest.kt | 65 +++++++ .../kotlin/internal/lease/LeaseRSocketTest.kt | 88 ++++++++++ .../kotlin/test/lease/LeaseServerTest.kt | 118 +++++++++++++ .../lease/example/LeaseClientServerExample.kt | 86 ++++++++++ .../ClientServerChannelTest.kt | 2 +- .../test/{ => transport}/EndToEndTest.kt | 2 +- .../{ => transport}/NettyTcpEndToEndTest.kt | 2 +- .../NettyWebsocketEndToEndTest.kt | 3 +- .../OkHttpNettyEndToEndTest.kt | 3 +- 41 files changed, 1474 insertions(+), 150 deletions(-) create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt create mode 100644 rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt create mode 100644 rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt create mode 100644 rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt create mode 100644 test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt create mode 100644 test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt rename test/src/test/kotlin/io/rsocket/kotlin/test/{ => transport}/ClientServerChannelTest.kt (98%) rename test/src/test/kotlin/io/rsocket/kotlin/test/{ => transport}/EndToEndTest.kt (99%) rename test/src/test/kotlin/io/rsocket/kotlin/test/{ => transport}/NettyTcpEndToEndTest.kt (88%) rename test/src/test/kotlin/io/rsocket/kotlin/test/{ => transport}/NettyWebsocketEndToEndTest.kt (83%) rename test/src/test/kotlin/io/rsocket/kotlin/test/{ => transport}/OkHttpNettyEndToEndTest.kt (85%) diff --git a/build.gradle b/build.gradle index aa262e931..c2f2263f8 100644 --- a/build.gradle +++ b/build.gradle @@ -109,11 +109,6 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey')) { gpg { sign = true } - - mavenCentralSync { - user = project.property('sonatypeUsername') - password = project.property('sonatypePassword') - } } } } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt index 5921cce41..8a51688a4 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt @@ -304,9 +304,11 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold fun leaseEnabled(frame: Frame): Boolean { ensureFrameType(FrameType.SETUP, frame) - return Frame.isFlagSet( - frame.flags(), - SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE) + return SetupFrameFlyweight.supportsLease(frame.flags()) + } + + fun enableLease(flags: Int): Int { + return flags or SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE } fun keepaliveInterval(frame: Frame): Int { @@ -594,7 +596,7 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold object Fragmentation { fun assembleFrame(blueprintFrame: Frame, - metadata: ByteBuf, + metadata: ByteBuf?, data: ByteBuf): Frame = create(blueprintFrame, diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt index ab92825d3..8115cad7d 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt @@ -20,6 +20,10 @@ class KeepAliveOptions : KeepAlive { override fun keepAliveMaxLifeTime() = maxLifeTime + fun copy(): KeepAliveOptions = KeepAliveOptions() + .keepAliveInterval(interval) + .keepAliveMaxLifeTime(maxLifeTime) + private fun assertDuration(duration: Duration, name: String) { if (duration.millis <= 0) { throw IllegalArgumentException("$name must be positive") diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt new file mode 100644 index 000000000..e97d63673 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt @@ -0,0 +1,18 @@ +package io.rsocket.kotlin + +import io.reactivex.Completable +import java.nio.ByteBuffer + +/** Provides means to grant lease to peer */ +interface LeaseRef { + + fun grantLease( + numberOfRequests: Int, + ttlMillis: Long, + metadata: ByteBuffer): Completable + + fun grantLease(numberOfRequests: Int, + timeToLiveMillis: Long): Completable + + fun onClose(): Completable +} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt index 01a6d7bab..8fd0637ea 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt @@ -20,6 +20,10 @@ class MediaTypeOptions : MediaType { override fun metadataMimeType(): String = metadataMimeType + fun copy(): MediaTypeOptions = MediaTypeOptions() + .dataMimeType(dataMimeType) + .metadataMimeType(metadataMimeType) + private fun assertMediaType(mediaType: String) { if (mediaType.isEmpty()) { throw IllegalArgumentException("media type must be non-empty") diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt index 4a682886e..ffaac747d 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt @@ -21,6 +21,8 @@ import io.reactivex.Single import io.rsocket.kotlin.interceptors.GlobalInterceptors import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.internal.fragmentation.FragmentationInterceptor +import io.rsocket.kotlin.internal.lease.ClientLeaseSupport +import io.rsocket.kotlin.internal.lease.ServerLeaseSupport import io.rsocket.kotlin.transport.ClientTransport import io.rsocket.kotlin.transport.ServerTransport import io.rsocket.kotlin.util.AbstractRSocket @@ -47,6 +49,7 @@ object RSocketFactory { private var acceptor: ClientAcceptor = { { emptyRSocket } } private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() } private var mtu = 0 + private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null private val interceptors = GlobalInterceptors.create() private var flags = 0 private var setupPayload: Payload = DefaultPayload.EMPTY @@ -76,6 +79,12 @@ object RSocketFactory { return this } + fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory { + this.flags = Frame.Setup.enableLease(flags) + this.leaseRefConsumer = leaseRefConsumer + return this + } + fun errorConsumer(errorConsumer: (Throwable) -> Unit): ClientRSocketFactory { this.errorConsumer = errorConsumer return this @@ -93,36 +102,65 @@ object RSocketFactory { } fun transport(transport: () -> ClientTransport): Start = - ClientStart(transport, interceptors()) + clientStart(acceptor, transport) + + fun transport(transport: ClientTransport): Start = + transport { transport } fun acceptor(acceptor: ClientAcceptor): ClientTransportAcceptor { this.acceptor = acceptor return object : ClientTransportAcceptor { override fun transport(transport: () -> ClientTransport) - : Start = - ClientStart(transport, interceptors()) + : Start = clientStart(acceptor, transport) } } - private fun interceptors(): InterceptorRegistry = - interceptors.copyWith { - it.connectionFirst( - FragmentationInterceptor(mtu)) - } + private fun clientStart(acceptor: ClientAcceptor, + transport: () -> ClientTransport): ClientStart = - private inner class ClientStart(private val transportClient: () -> ClientTransport, - private val interceptors: InterceptorRegistry) + ClientStart(acceptor, + errorConsumer, + mtu, + leaseRefConsumer, + flags, + setupPayload, + keepAlive.copy(), + mediaType.copy(), + streamRequestLimit, + transport, + interceptors.copy()) + + private class ClientStart( + private val acceptor: ClientAcceptor, + private val errorConsumer: (Throwable) -> Unit, + private var mtu: Int, + private val leaseRef: ((LeaseRef) -> Unit)?, + private val flags: Int, + private val setupPayload: Payload, + private val keepAlive: KeepAlive, + private val mediaType: MediaType, + private val streamRequestLimit: Int, + private val transportClient: () -> ClientTransport, + private val parentInterceptors: InterceptorRegistry) : Start { override fun start(): Single { return transportClient() .connect() .flatMap { connection -> - val setupFrame = createSetupFrame() + + val withLease = + enableLease(parentInterceptors) + + val interceptors = + enableFragmentation(withLease) + + val interceptConnection = interceptors as InterceptConnection + val interceptRSocket = interceptors as InterceptRSocket val demuxer = ClientConnectionDemuxer( connection, - interceptors) + interceptConnection) val rSocketRequester = RSocketRequester( demuxer.requesterConnection(), @@ -130,12 +168,12 @@ object RSocketFactory { ClientStreamIds(), streamRequestLimit) - val wrappedRequester = interceptors + val wrappedRequester = interceptRSocket .interceptRequester(rSocketRequester) val handlerRSocket = acceptor()(wrappedRequester) - val wrappedHandler = interceptors + val wrappedHandler = interceptRSocket .interceptHandler(handlerRSocket) RSocketResponder( @@ -149,12 +187,21 @@ object RSocketFactory { keepAlive, errorConsumer) + val setupFrame = createSetupFrame() + connection .sendOne(setupFrame) .andThen(Single.just(wrappedRequester)) } } + private fun enableFragmentation(parentInterceptors: InterceptorRegistry) + : InterceptorRegistry { + parentInterceptors.connectionFirst( + FragmentationInterceptor(mtu)) + return parentInterceptors + } + private fun createSetupFrame(): Frame { return Frame.Setup.from( flags, @@ -164,14 +211,23 @@ object RSocketFactory { mediaType.dataMimeType(), setupPayload) } + + private fun enableLease(parentInterceptors: InterceptorRegistry) + : InterceptorRegistry = + if (leaseRef != null) { + parentInterceptors.copyWith( + ClientLeaseSupport.enable(leaseRef)()) + } else { + parentInterceptors.copy() + } } } class ServerRSocketFactory internal constructor() { - private var acceptor: ServerAcceptor = { { _, _ -> Single.just(emptyRSocket) } } private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() } private var mtu = 0 + private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null private val interceptors = GlobalInterceptors.create() private var streamRequestLimit = defaultStreamRequestLimit @@ -186,6 +242,11 @@ object RSocketFactory { return this } + fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory { + this.leaseRefConsumer = leaseRefConsumer + return this + } + fun errorConsumer(errorConsumer: (Throwable) -> Unit): ServerRSocketFactory { this.errorConsumer = errorConsumer return this @@ -197,26 +258,28 @@ object RSocketFactory { } fun acceptor(acceptor: ServerAcceptor): ServerTransportAcceptor { - this.acceptor = acceptor return object : ServerTransportAcceptor { + override fun transport( transport: () -> ServerTransport): Start = - ServerStart(transport, interceptors()) - } - } - - private fun interceptors(): InterceptorRegistry { - return interceptors.copyWith { - it.connectionFirst( - ServerContractInterceptor(errorConsumer)) - it.connectionFirst( - FragmentationInterceptor(mtu)) + ServerStart(transport, + acceptor, + errorConsumer, + mtu, + leaseRefConsumer, + interceptors.copy(), + streamRequestLimit) } } - private inner class ServerStart( + private class ServerStart( private val transportServer: () -> ServerTransport, - private val interceptors: InterceptorRegistry) : Start { + private val acceptor: ServerAcceptor, + private val errorConsumer: (Throwable) -> Unit, + private val mtu: Int, + private val leaseRef: ((LeaseRef) -> Unit)?, + private val parentInterceptors: InterceptorRegistry, + private val streamRequestLimit: Int) : Start { override fun start(): Single { return transportServer().start(object @@ -225,25 +288,37 @@ object RSocketFactory { override fun invoke(duplexConnection: DuplexConnection) : Completable { + val withLease = + enableLease(parentInterceptors) + + val withServerContract = + enableServerContract(withLease) + + val interceptors = + enableFragmentation(withServerContract) + val demuxer = ServerConnectionDemuxer( duplexConnection, - interceptors) + interceptors as InterceptConnection) return demuxer .setupConnection() .receive() .firstOrError() .flatMapCompletable { setup -> - accept(setup, demuxer) + accept(setup, + interceptors as InterceptRSocket, + demuxer) } } }) } private fun accept(setupFrame: Frame, + interceptors: InterceptRSocket, demuxer: ConnectionDemuxer): Completable { - val setup = Setup.create(setupFrame) + val setup = SetupContents.create(setupFrame) val rSocketRequester = RSocketRequester( demuxer.requesterConnection(), @@ -272,6 +347,30 @@ object RSocketFactory { } .ignoreElement() } + + private fun enableLease(parentInterceptors: InterceptorRegistry) + : InterceptorRegistry = + if (leaseRef != null) { + parentInterceptors.copyWith( + ServerLeaseSupport.enable(leaseRef)()) + } else { + parentInterceptors.copy() + } + + private fun enableServerContract(parentInterceptors: InterceptorRegistry) + : InterceptorRegistry { + + parentInterceptors.connectionFirst( + ServerContractInterceptor(errorConsumer, leaseRef != null)) + return parentInterceptors + } + + private fun enableFragmentation(parentInterceptors: InterceptorRegistry) + : InterceptorRegistry { + parentInterceptors.connectionFirst( + FragmentationInterceptor(mtu)) + return parentInterceptors + } } } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt index 291946a54..f9437cc6e 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt @@ -15,71 +15,13 @@ */ package io.rsocket.kotlin -import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_M - -import io.rsocket.kotlin.internal.frame.SetupFrameFlyweight -import java.nio.ByteBuffer - /** * Exposed to server for determination of RequestHandler based on mime types * and SETUP metadata/data */ -abstract class Setup : Payload, KeepAlive { - - abstract fun metadataMimeType(): String - - abstract fun dataMimeType(): String - - abstract val flags: Int - - fun willClientHonorLease(): Boolean = Frame.isFlagSet(flags, HONOR_LEASE) - - override fun hasMetadata(): Boolean = Frame.isFlagSet(flags, FLAGS_M) - - private class SetupImpl( - private val metadataMimeType: String, - private val dataMimeType: String, - override val data: ByteBuffer, - override val metadata: ByteBuffer, - private val keepAliveInterval: Int, - private val keepAliveLifetime: Int, - override val flags: Int) : Setup() { - - init { - if (!hasMetadata() && metadata.remaining() > 0) { - throw IllegalArgumentException("metadata flag incorrect") - } - } - - override fun keepAliveInterval(): Duration = - Duration.ofMillis(keepAliveInterval) - - override fun keepAliveMaxLifeTime(): Duration = - Duration.ofMillis(keepAliveLifetime) - - override fun metadataMimeType(): String = metadataMimeType - - override fun dataMimeType(): String = dataMimeType - } - - companion object { +interface Setup : Payload { - private const val HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE + fun metadataMimeType(): String - internal fun create(setupFrame: Frame): Setup { - Frame.ensureFrameType(FrameType.SETUP, setupFrame) - return try { - SetupImpl( - Frame.Setup.metadataMimeType(setupFrame), - Frame.Setup.dataMimeType(setupFrame), - setupFrame.data, - setupFrame.metadata, - Frame.Setup.keepaliveInterval(setupFrame), - Frame.Setup.maxLifetime(setupFrame), - Frame.Setup.getFlags(setupFrame)) - } finally { - setupFrame.release() - } - } - } + fun dataMimeType(): String } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt new file mode 100644 index 000000000..cff417969 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt @@ -0,0 +1,18 @@ +package io.rsocket.kotlin.exceptions + +import io.rsocket.kotlin.internal.lease.Lease + +class MissingLeaseException(lease: Lease, tag: String) + : RejectedException(leaseMessage(lease, tag)) { + + override fun fillInStackTrace(): Throwable = this + + companion object { + internal fun leaseMessage(lease: Lease, tag: String): String { + val expired = lease.isExpired + val allowedRequests = lease.allowedRequests + return "$tag: Missing lease. " + + "Expired: $expired, allowedRequests: $allowedRequests" + } + } +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt index 42e23f82c..6014650c4 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt @@ -17,7 +17,7 @@ package io.rsocket.kotlin.exceptions import io.rsocket.kotlin.internal.frame.ErrorFrameFlyweight -class RejectedException : RSocketException, Retryable { +open class RejectedException : RSocketException, Retryable { constructor(message: String) : super(message) constructor(message: String, cause: Throwable) : super(message, cause) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt index 3a0384972..92dfd6759 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt @@ -29,8 +29,8 @@ import org.reactivestreams.Publisher import org.slf4j.LoggerFactory internal class ServerConnectionDemuxer(source: DuplexConnection, - plugins: InterceptorRegistry) - : ConnectionDemuxer(source, plugins) { + interceptors: InterceptConnection) + : ConnectionDemuxer(source, interceptors) { override fun demux(frame: Frame): Type { val streamId = frame.streamId @@ -45,8 +45,8 @@ internal class ServerConnectionDemuxer(source: DuplexConnection, } internal class ClientConnectionDemuxer(source: DuplexConnection, - plugins: InterceptorRegistry) - : ConnectionDemuxer(source, plugins) { + interceptors: InterceptConnection) + : ConnectionDemuxer(source, interceptors) { override fun demux(frame: Frame): Type { val streamId = frame.streamId @@ -61,27 +61,26 @@ internal class ClientConnectionDemuxer(source: DuplexConnection, } sealed class ConnectionDemuxer(private val source: DuplexConnection, - plugins: InterceptorRegistry) { - + interceptors: InterceptConnection) { private val setupConnection: DuplexConnection private val responderConnection: DuplexConnection private val requesterConnection: DuplexConnection private val serviceConnection: DuplexConnection init { - val src = plugins.interceptConnection(ALL, source) + val src = interceptors.interceptConnection(ALL, source) val setupConn = DemuxedConnection(src) - setupConnection = plugins.interceptConnection(Type.SETUP, setupConn) + setupConnection = interceptors.interceptConnection(Type.SETUP, setupConn) val requesterConn = DemuxedConnection(src) - requesterConnection = plugins.interceptConnection(REQUESTER, requesterConn) + requesterConnection = interceptors.interceptConnection(REQUESTER, requesterConn) val responderConn = DemuxedConnection(src) - responderConnection = plugins.interceptConnection(RESPONDER, responderConn) + responderConnection = interceptors.interceptConnection(RESPONDER, responderConn) val serviceConn = DemuxedConnection(src) - serviceConnection = plugins.interceptConnection(SERVICE, serviceConn) + serviceConnection = interceptors.interceptConnection(SERVICE, serviceConn) src.receive() .groupBy(::demux) @@ -126,7 +125,8 @@ sealed class ConnectionDemuxer(private val source: DuplexConnection, val frames = if (debugEnabled) { Flowable.fromPublisher(frame) .doOnNext { f -> - LOGGER.debug("sending -> " + f.toString()) } + LOGGER.debug("sending -> " + f.toString()) + } } else { frame } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt index 8e3df235d..1257e1d08 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt @@ -17,13 +17,16 @@ package io.rsocket.kotlin.internal import io.rsocket.kotlin.DuplexConnection +import io.rsocket.kotlin.InterceptorOptions import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor -import io.rsocket.kotlin.InterceptorOptions import io.rsocket.kotlin.interceptors.RSocketInterceptor -import java.util.ArrayList +import java.util.* -internal class InterceptorRegistry : InterceptorOptions { +internal class InterceptorRegistry : + InterceptorOptions, + InterceptConnection, + InterceptRSocket { private val connections = ArrayList() private val requesters = ArrayList() private val handlers = ArrayList() @@ -36,9 +39,13 @@ internal class InterceptorRegistry : InterceptorOptions { this.handlers.addAll(interceptorRegistry.handlers) } - fun copyWith(action: (InterceptorRegistry) -> Unit): InterceptorRegistry { - val copy = InterceptorRegistry(this) - action(copy) + fun copy(): InterceptorRegistry = InterceptorRegistry(this) + + fun copyWith(registry: InterceptorRegistry): InterceptorRegistry { + val copy = copy() + copy.connections.addAll(registry.connections) + copy.requesters.addAll(registry.requesters) + copy.handlers.addAll(registry.handlers) return copy } @@ -58,7 +65,7 @@ internal class InterceptorRegistry : InterceptorOptions { handlers.add(interceptor) } - fun interceptRequester(rSocket: RSocket): RSocket { + override fun interceptRequester(rSocket: RSocket): RSocket { var rs = rSocket for (interceptor in requesters) { rs = interceptor(rs) @@ -66,7 +73,7 @@ internal class InterceptorRegistry : InterceptorOptions { return rs } - fun interceptHandler(rSocket: RSocket): RSocket { + override fun interceptHandler(rSocket: RSocket): RSocket { var rs = rSocket for (interceptor in handlers) { rs = interceptor(rs) @@ -74,7 +81,7 @@ internal class InterceptorRegistry : InterceptorOptions { return rs } - fun interceptConnection( + override fun interceptConnection( type: DuplexConnectionInterceptor.Type, connection: DuplexConnection): DuplexConnection { var conn = connection @@ -84,3 +91,19 @@ internal class InterceptorRegistry : InterceptorOptions { return conn } } + +internal interface InterceptConnection { + + fun interceptConnection( + type: DuplexConnectionInterceptor.Type, + connection: DuplexConnection): DuplexConnection +} + +internal interface InterceptRSocket { + + fun interceptRequester(rSocket: RSocket): RSocket + + fun interceptHandler(rSocket: RSocket): RSocket +} + + diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt index 8fc867cb2..b6c2326ff 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt @@ -9,9 +9,9 @@ import io.rsocket.kotlin.exceptions.InvalidSetupException import io.rsocket.kotlin.exceptions.RSocketException import io.rsocket.kotlin.exceptions.RejectedResumeException import io.rsocket.kotlin.exceptions.RejectedSetupException -import io.rsocket.kotlin.internal.frame.SetupFrameFlyweight import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor.Type +import io.rsocket.kotlin.internal.frame.SetupFrameFlyweight import io.rsocket.kotlin.util.DuplexConnectionProxy internal class ServerContractInterceptor( @@ -20,10 +20,10 @@ internal class ServerContractInterceptor( private val leaseEnabled: Boolean, private val resumeEnabled: Boolean) : DuplexConnectionInterceptor { - constructor(errorConsumer: (Throwable) -> Unit) : + constructor(errorConsumer: (Throwable) -> Unit, leaseEnabled: Boolean) : this(errorConsumer, SetupFrameFlyweight.CURRENT_VERSION, - false, + leaseEnabled, false) override fun invoke(type: Type, source: DuplexConnection): DuplexConnection = diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt index 1b71018df..317908d8b 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt @@ -36,8 +36,9 @@ internal abstract class ServiceHandler(private val serviceConnection: DuplexConn protected abstract fun handleKeepAlive(frame: Frame) + @Suppress("UNUSED_PARAMETER") private fun handleLease(frame: Frame) { - errorConsumer(IllegalArgumentException("Lease is not supported: $frame")) + /*Lease interceptors processed frame already, just release it here*/ } private fun handleError(frame: Frame) { diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt new file mode 100644 index 000000000..5aa82095b --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt @@ -0,0 +1,46 @@ +package io.rsocket.kotlin.internal + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight +import java.nio.ByteBuffer + +internal class SetupContents( + private val metadataMimeType: String, + private val dataMimeType: String, + override val data: ByteBuffer, + override val metadata: ByteBuffer, + private val keepAliveInterval: Int, + private val keepAliveLifetime: Int, + private val flags: Int) : Setup, KeepAlive { + + override fun keepAliveInterval(): Duration = + Duration.ofMillis(keepAliveInterval) + + override fun keepAliveMaxLifeTime(): Duration = + Duration.ofMillis(keepAliveLifetime) + + override fun metadataMimeType(): String = metadataMimeType + + override fun dataMimeType(): String = dataMimeType + + override fun hasMetadata(): Boolean = Frame.isFlagSet(flags, FrameHeaderFlyweight.FLAGS_M) + + companion object { + + internal fun create(setupFrame: Frame): SetupContents { + Frame.ensureFrameType(FrameType.SETUP, setupFrame) + return try { + SetupContents( + Frame.Setup.metadataMimeType(setupFrame), + Frame.Setup.dataMimeType(setupFrame), + setupFrame.data, + setupFrame.metadata, + Frame.Setup.keepaliveInterval(setupFrame), + Frame.Setup.maxLifetime(setupFrame), + Frame.Setup.getFlags(setupFrame)) + } finally { + setupFrame.release() + } + } + } +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt index 860c26a1a..9133cdf4c 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt @@ -28,7 +28,11 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable { private val isDisposed = AtomicBoolean() private val blueprintFrame = frame.retain() private val dataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer() - private val metadataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer() + private val metadataBuffer = + if (frame.hasMetadata()) + PooledByteBufAllocator.DEFAULT.compositeBuffer() + else + null fun append(frame: Frame): StreamFramesReassembler { val byteBuf = frame.content() @@ -37,14 +41,14 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable { val metadataLength = FrameHeaderFlyweight.metadataLength( byteBuf, frameType, - frameLength)!! + frameLength) ?: 0 val dataLength = FrameHeaderFlyweight.dataLength(byteBuf, frameType) if (0 < metadataLength) { var metadataOffset = FrameHeaderFlyweight.metadataOffset(byteBuf) if (FrameHeaderFlyweight.hasMetadataLengthField(frameType)) { metadataOffset += FrameHeaderFlyweight.FRAME_LENGTH_SIZE } - metadataBuffer.addComponent( + metadataBuffer!!.addComponent( true, byteBuf.retainedSlice(metadataOffset, metadataLength)) } @@ -73,7 +77,7 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable { if (isDisposed.compareAndSet(false, true)) { blueprintFrame.release() dataBuffer.release() - metadataBuffer.release() + metadataBuffer?.release() } } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt index 330e81faf..409a252cc 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt @@ -169,6 +169,9 @@ internal object SetupFrameFlyweight { fun keepaliveInterval(byteBuf: ByteBuf): Int = byteBuf.getInt(KEEPALIVE_INTERVAL_FIELD_OFFSET) + fun supportsLease(flags: Int): Boolean = + (flags and FLAGS_WILL_HONOR_LEASE) == FLAGS_WILL_HONOR_LEASE + fun maxLifetime(byteBuf: ByteBuf): Int = byteBuf.getInt(MAX_LIFETIME_FIELD_OFFSET) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt new file mode 100644 index 000000000..fbb5dbcf1 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.reactivex.Completable +import io.rsocket.kotlin.LeaseRef +import java.nio.ByteBuffer + +internal class ConnectionLeaseRef(private val leaseGranterConnection + : LeaseGranterConnection) : LeaseRef { + + override fun grantLease(numberOfRequests: Int, + ttlMillis: Long, + metadata: ByteBuffer): Completable { + return grant( + numberOfRequests, + ttlMillis, + metadata) + } + + override fun grantLease(numberOfRequests: Int, + timeToLiveMillis: Long): Completable { + return grant( + numberOfRequests, + timeToLiveMillis, + null) + } + + override fun onClose(): Completable = leaseGranterConnection.onClose() + + private fun grant( + numberOfRequests: Int, + ttlMillis: Long, + metadata: ByteBuffer?): Completable { + assertArgs(numberOfRequests, ttlMillis) + val ttl = Math.toIntExact(ttlMillis) + return leaseGranterConnection.grantLease( + numberOfRequests, + ttl, + metadata) + } + + private fun assertArgs(numberOfRequests: Int, ttl: Long) { + if (numberOfRequests <= 0) { + throw IllegalArgumentException( + "numberOfRequests must be non-negative: $numberOfRequests") + } + if (ttl <= 0) { + throw IllegalArgumentException( + "timeToLive must be non-negative: $ttl") + } + } +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt new file mode 100644 index 000000000..9253f1b83 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt @@ -0,0 +1,71 @@ +/* + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import java.nio.ByteBuffer + +/** A contract for RSocket lease, which is sent by a request acceptor and is time bound. */ +interface Lease { + + /** + * Number of requests allowed by this lease. + * + * @return The number of requests allowed by this lease. + */ + val allowedRequests: Int + + /** + * Number of milliseconds that this lease is valid from the time it is received. + * + * @return Number of milliseconds that this lease is valid from the time it is received. + */ + val ttl: Int + + /** + * Metadata for the lease. + * + * @return Metadata for the lease. + */ + val metadata: ByteBuffer? + + /** + * Checks if the lease is expired now. + * + * @return `true` if the lease has expired. + */ + val isExpired: Boolean + get() = isExpired(System.currentTimeMillis()) + + /** Checks if the lease has not expired and there are allowed requests available */ + val isValid: Boolean + get() = !isExpired && allowedRequests > 0 + + /** + * Absolute time since epoch at which this lease will expire. + * + * @return Absolute time since epoch at which this lease will expire. + */ + fun expiry(): Long + + /** + * Checks if the lease is expired for the passed `now`. + * + * @param now current time in millis. + * @return `true` if the lease has expired. + */ + fun isExpired(now: Long): Boolean = now >= expiry() +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt new file mode 100644 index 000000000..978de9120 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +/** State shared by Lease related interceptors */ +internal class LeaseContext(var leaseEnabled: Boolean = true) { + + override fun toString(): String = + "LeaseContext{leaseEnabled=$leaseEnabled}" +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt new file mode 100644 index 000000000..c6dc64765 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt @@ -0,0 +1,55 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.reactivex.Flowable +import io.rsocket.kotlin.DuplexConnection +import io.rsocket.kotlin.Frame +import io.rsocket.kotlin.FrameType +import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor +import io.rsocket.kotlin.util.DuplexConnectionProxy + +internal class LeaseEnablingInterceptor(private val leaseContext: LeaseContext) + : DuplexConnectionInterceptor { + + override fun invoke(type: DuplexConnectionInterceptor.Type, + connection: DuplexConnection) + : DuplexConnection = + if (type === DuplexConnectionInterceptor.Type.SETUP) { + LeaseEnablingConnection(connection, leaseContext) + } else { + connection + } + + private class LeaseEnablingConnection( + setupConnection: DuplexConnection, + private val leaseContext: LeaseContext) + : DuplexConnectionProxy(setupConnection) { + + override fun receive(): Flowable { + return super.receive() + .doOnNext( + { f -> + val enabled = f.type == FrameType.SETUP + && Frame.Setup.leaseEnabled(f) + leaseContext.leaseEnabled = enabled + }) + } + } +} + + diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt new file mode 100644 index 000000000..1b2f812f5 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.reactivex.Completable +import java.nio.ByteBuffer + +interface LeaseGranter { + + fun grantLease(requests: Int, ttl: Int, + metadata: ByteBuffer?): Completable +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt new file mode 100644 index 000000000..7a9b7e6d8 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.netty.buffer.Unpooled +import io.reactivex.Completable +import io.reactivex.Flowable +import io.rsocket.kotlin.DuplexConnection +import io.rsocket.kotlin.Frame +import io.rsocket.kotlin.FrameType +import io.rsocket.kotlin.util.DuplexConnectionProxy +import org.reactivestreams.Publisher +import java.nio.ByteBuffer + +internal class LeaseGranterConnection( + private val leaseContext: LeaseContext, + source: DuplexConnection, + private val sendManager: LeaseManager, + private val receiveManager: LeaseManager) + : DuplexConnectionProxy(source), LeaseGranter { + + override fun send(frame: Publisher): Completable { + return super.send(Flowable.fromPublisher(frame) + .doOnNext { f -> leaseGrantedTo(f, receiveManager) }) + } + + override fun receive(): Flowable { + return super.receive() + .doOnNext { f -> leaseGrantedTo(f, sendManager) } + } + + private fun leaseGrantedTo(f: Frame, leaseManager: LeaseManager) { + if (isEnabled() && isLease(f)) { + val requests = Frame.Lease.numberOfRequests(f) + val ttl = Frame.Lease.ttl(f) + leaseManager.grantLease(requests, ttl) + } + } + + override fun grantLease(requests: Int, + ttl: Int, + metadata: ByteBuffer?): Completable { + + val byteBuf = if (metadata == null) Unpooled.EMPTY_BUFFER + else Unpooled.wrappedBuffer(metadata) + + return when { + ttl <= 0 -> return Completable + .error(IllegalArgumentException( + "Time-to-live should be positive")) + requests <= 0 -> Completable + .error(IllegalArgumentException( + "Allowed requests should be positive")) + else -> send(Flowable.just(Frame.Lease.from(ttl, requests, byteBuf))) + } + } + + private fun isLease(f: Frame): Boolean { + return f.type === FrameType.LEASE + } + + private fun isEnabled() = leaseContext.leaseEnabled +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt new file mode 100644 index 000000000..b538ece44 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.rsocket.kotlin.DuplexConnection +import io.rsocket.kotlin.LeaseRef +import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor + +internal class LeaseGranterInterceptor( + private val leaseContext: LeaseContext, + private val sender: LeaseManager, + private val receiver: LeaseManager, + private val leaseHandle: (LeaseRef) -> Unit) + : DuplexConnectionInterceptor { + + override fun invoke(type: DuplexConnectionInterceptor.Type, + connection: DuplexConnection): DuplexConnection { + return if (type === DuplexConnectionInterceptor.Type.SERVICE) { + val leaseGranterConnection = LeaseGranterConnection( + leaseContext, + connection, + sender, + receiver) + val leaseConnectionRef = ConnectionLeaseRef(leaseGranterConnection) + leaseHandle(leaseConnectionRef) + leaseGranterConnection + } else { + connection + } + } +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt new file mode 100644 index 000000000..9de07dfba --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt @@ -0,0 +1,74 @@ +package io.rsocket.kotlin.internal.lease + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +internal class LeaseImpl(private val startingNumberOfRequests: Int, + override val ttl: Int, + override val metadata: ByteBuffer?) : Lease { + private val numberOfRequests: AtomicInteger + private val expiry: Long + + init { + assertNumberOfRequests(startingNumberOfRequests, ttl) + this.numberOfRequests = AtomicInteger(startingNumberOfRequests) + this.expiry = now() + ttl + } + + override val allowedRequests: Int + get() = Math.max(0, numberOfRequests.get()) + + override val isValid: Boolean + get() = startingNumberOfRequests > 0 + && allowedRequests > 0 + && !isExpired + + override fun expiry(): Long { + return expiry + } + + fun availability(): Double { + return if (isValid) allowedRequests / + startingNumberOfRequests.toDouble() else 0.0 + } + + fun use(useRequestCount: Int): Boolean { + assertUseRequests(useRequestCount) + return !isExpired && numberOfRequests.accAndGet( + useRequestCount, + { cur, update -> Math.max(-1, cur - update) }) >= 0 + } + + companion object { + + private fun now() = System.currentTimeMillis() + + private inline fun AtomicInteger.accAndGet(update: Int, + acc: (Int, Int) -> Int): Int { + var cur: Int + var next: Int + do { + cur = get() + next = acc.invoke(cur, update) + } while (!compareAndSet(cur, next)) + return next + } + + fun invalidLease(): LeaseImpl = LeaseImpl(0, 0, null) + + fun assertUseRequests(useRequestCount: Int) { + if (useRequestCount <= 0) { + throw IllegalArgumentException("Number of requests must be positive") + } + } + + private fun assertNumberOfRequests(numberOfRequests: Int, ttl: Int) { + if (numberOfRequests < 0) { + throw IllegalArgumentException("Number of requests must be non-negative") + } + if (ttl < 0) { + throw IllegalArgumentException("Time-to-live must be non-negative") + } + } + } +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt new file mode 100644 index 000000000..b8ae2f4e9 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.interceptors.RSocketInterceptor + +internal class LeaseInterceptor(private val leaseContext: LeaseContext, + private val leaseManager: LeaseManager, + private val tag: String) + : RSocketInterceptor { + + override fun invoke(rSocket: RSocket): RSocket = + LeaseRSocket( + leaseContext, + rSocket, + tag, + leaseManager) +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt new file mode 100644 index 000000000..14224d4c8 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.rsocket.kotlin.exceptions.MissingLeaseException + +/** Updates Lease on use and grant */ +internal class LeaseManager(private val tag: String) { + @Volatile + private var currentLease = INVALID_MUTABLE_LEASE + + init { + requireNotNull(tag, { "tag" }) + } + + fun availability(): Double { + return currentLease.availability() + } + + fun grantLease(numberOfRequests: Int, ttl: Int) { + assertGrantedLease(numberOfRequests, ttl) + this.currentLease = LeaseImpl(numberOfRequests, ttl, null) + } + + fun useLease(): Result = + if (currentLease.use(1)) + Success + else + Error(MissingLeaseException(currentLease, tag)) + + override fun toString(): String { + return "LeaseManager{tag='$tag'}" + } + + companion object { + private val INVALID_MUTABLE_LEASE = LeaseImpl.invalidLease() + + private fun assertGrantedLease(numberOfRequests: Int, ttl: Int) { + if (numberOfRequests <= 0) { + throw IllegalArgumentException("numberOfRequests must be positive") + } + if (ttl <= 0) { + throw IllegalArgumentException("time-to-live must be positive") + } + } + } +} + +internal sealed class Result + +internal object Success : Result() + +internal data class Error(val ex: Throwable) : Result() \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt new file mode 100644 index 000000000..7263cc229 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt @@ -0,0 +1,95 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.reactivex.Completable +import io.reactivex.Flowable +import io.reactivex.Single +import io.rsocket.kotlin.Payload +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.util.RSocketProxy +import org.reactivestreams.Publisher + +internal class LeaseRSocket( + private val leaseContext: LeaseContext, + source: RSocket, private + val tag: String, + private val leaseManager: LeaseManager) : RSocketProxy(source) { + + override fun fireAndForget(payload: Payload): Completable { + return request(super.fireAndForget(payload)) + } + + override fun requestResponse(payload: Payload): Single { + return request(super.requestResponse(payload)) + } + + override fun requestStream(payload: Payload): Flowable { + return request(super.requestStream(payload)) + } + + override fun requestChannel(payloads: Publisher): Flowable { + return request(super.requestChannel(payloads)) + } + + override fun availability(): Double { + return Math.min(super.availability(), leaseManager.availability()) + } + + override fun toString(): String = + "LeaseRSocket(leaseContext=$leaseContext," + + " tag='$tag', " + + "leaseManager=$leaseManager)" + + private fun request(actual: Completable): Completable = + request( + { Completable.defer(it) }, + actual, + { Completable.error(it) }) + + private fun request(actual: Single): Single = + request( + { Single.defer(it) }, + actual, + { Single.error(it) }) + + private fun request(actual: Flowable): Flowable = + request( + { Flowable.defer(it) }, + actual, + { Flowable.error(it) }) + + private fun request( + defer: (() -> T) -> T, + actual: T, + error: (Throwable) -> T): T { + + return defer { + if (isEnabled()) { + val result = leaseManager.useLease() + when (result) { + is Success -> actual + is Error -> error(result.ex) + } + } else { + actual + } + } + } + + private fun isEnabled() = leaseContext.leaseEnabled +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt new file mode 100644 index 000000000..f075af7d5 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2018 Maksym Ostroverkhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.lease + +import io.rsocket.kotlin.LeaseRef +import io.rsocket.kotlin.internal.InterceptorRegistry + +internal sealed class LeaseSupport { + + abstract fun enable(leaseHandle: (LeaseRef) -> Unit): () -> InterceptorRegistry +} + +internal object ServerLeaseSupport : LeaseSupport() { + + override fun enable(leaseHandle: (LeaseRef) -> Unit) + : () -> InterceptorRegistry = { + + val sender = LeaseManager(serverRequester) + val receiver = LeaseManager(serverResponder) + val leaseContext = LeaseContext() + val registry = InterceptorRegistry() + + /*requester RSocket is Lease aware*/ + registry.requester(LeaseInterceptor( + leaseContext, + sender, + serverRequester)) + /*handler RSocket is Lease aware*/ + registry.handler(LeaseInterceptor( + leaseContext, + receiver, + serverResponder)) + /*grants Lease quotas of above RSockets*/ + registry.connection(LeaseGranterInterceptor( + leaseContext, + sender, + receiver, + leaseHandle)) + /*enables lease for particular connection*/ + registry.connection(LeaseEnablingInterceptor(leaseContext)) + registry + } + + private const val serverRequester = "server requester" + private const val serverResponder = "server responder" +} + +internal object ClientLeaseSupport : LeaseSupport() { + private val leaseEnabled = LeaseContext() + + override fun enable(leaseHandle: (LeaseRef) -> Unit) + : () -> InterceptorRegistry = { + + val sender = LeaseManager(clientRequester) + val receiver = LeaseManager(clientResponder) + val registry = InterceptorRegistry() + /*requester RSocket is Lease aware*/ + registry.requester(LeaseInterceptor( + leaseEnabled, + sender, clientRequester)) + /*handler RSocket is Lease aware*/ + registry.handler(LeaseInterceptor( + leaseEnabled, + receiver, + clientResponder)) + /*grants Lease quotas to above RSockets*/ + registry.connection(LeaseGranterInterceptor( + leaseEnabled, + sender, + receiver, + leaseHandle)) + registry + } + + private const val clientRequester = "client requester" + private const val clientResponder = "client responder" +} diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt index b08f6c73b..956282a33 100644 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt @@ -35,15 +35,6 @@ class ServiceHandlerTest { conn.close().subscribe() } - @Test - fun serviceHandlerLease() { - ServerServiceHandler(conn, keepAlive, errors) - receiver.onNext(Frame.Lease.from(1000, 42, EMPTY_BUFFER)) - val errs = errors.get() - assertEquals(1, errs.size) - assertTrue(errs.first() is IllegalArgumentException) - } - @Test fun serviceHandlerError() { ServerServiceHandler(conn, keepAlive, errors) diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt index eaccd470c..737752f46 100644 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt @@ -1,8 +1,8 @@ package io.rsocket.kotlin.frame -import io.rsocket.kotlin.Frame -import io.rsocket.kotlin.Setup import io.rsocket.kotlin.DefaultPayload +import io.rsocket.kotlin.Frame +import io.rsocket.kotlin.internal.SetupContents import org.junit.Assert.assertEquals import org.junit.Test @@ -14,7 +14,7 @@ class SetupFrameTest { "metadataMime", "dataMime", DefaultPayload.textPayload("data", "metadata")) - val setup = Setup.create(setupFrame) + val setup = SetupContents.create(setupFrame) assertEquals(setup.keepAliveInterval().millis, 100) assertEquals(setup.keepAliveMaxLifeTime().millis, 1000) assertEquals(setup.metadataMimeType(), "metadataMime") diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt index f69bf462e..4fe467ad4 100644 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt @@ -19,8 +19,7 @@ package io.rsocket.kotlin.internal.fragmentation import io.rsocket.kotlin.Frame import io.rsocket.kotlin.FrameType import io.rsocket.kotlin.DefaultPayload -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue +import org.junit.Assert.* import org.junit.Test import java.nio.ByteBuffer import java.util.concurrent.ThreadLocalRandom @@ -58,6 +57,23 @@ class StreamFramesReassemblerTest { } } + @Test + fun testReassembleNullMetadata() { + val data = createRandomBytes(16) + val metadata = null + + val from = Frame.Request.from( + 1024, FrameType.REQUEST_RESPONSE, DefaultPayload(data, metadata), 1) + val frameFragmenter = FrameFragmenter(2) + val frameReassembler = StreamFramesReassembler(from) + frameFragmenter.fragment(from) + .doOnNext { frameReassembler.append(it) } + .blockingLast() + val reassemble = frameReassembler.reassemble() + assertFalse(reassemble.hasMetadata()) + assertFalse(reassemble.metadata.hasRemaining()) + } + private fun createRandomBytes(size: Int): ByteBuffer { val bytes = ByteArray(size) ThreadLocalRandom.current().nextBytes(bytes) diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt new file mode 100644 index 000000000..160d49851 --- /dev/null +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt @@ -0,0 +1,74 @@ +package io.rsocket.kotlin.internal.lease + +import io.netty.buffer.Unpooled +import io.reactivex.Completable +import io.reactivex.Flowable +import io.reactivex.processors.PublishProcessor +import io.rsocket.kotlin.Frame +import io.rsocket.kotlin.FrameType +import io.rsocket.kotlin.internal.lease.LeaseContext +import io.rsocket.kotlin.internal.lease.LeaseGranterConnection +import io.rsocket.kotlin.internal.lease.LeaseManager +import io.rsocket.kotlin.test.util.LocalDuplexConnection +import org.junit.Assert.* +import org.junit.Before +import org.junit.Test +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit + +class LeaseGranterConnectionTest { + + private lateinit var leaseGranterConnection: LeaseGranterConnection + private lateinit var sender: PublishProcessor + private lateinit var receiver: PublishProcessor + private lateinit var send: LeaseManager + private lateinit var receive: LeaseManager + + @Before + fun setUp() { + val leaseContext = LeaseContext() + send = LeaseManager("send") + receive = LeaseManager("receive") + sender = PublishProcessor.create() + receiver = PublishProcessor.create() + val local = LocalDuplexConnection("test", sender, receiver) + leaseGranterConnection = LeaseGranterConnection( + leaseContext, + local, + send, + receive) + } + + @Test + fun sentLease() { + leaseGranterConnection.send( + Flowable.just(Frame.Lease.from(2, 1, Unpooled.EMPTY_BUFFER))) + .blockingAwait() + assertEquals(1.0, receive.availability(), 1e-5) + assertEquals(0.0, send.availability(), 1e-5) + } + + @Test + fun receivedLease() { + leaseGranterConnection.receive().subscribe() + receiver.onNext(Frame.Lease.from(2, 1, Unpooled.EMPTY_BUFFER)) + assertEquals(0.0, receive.availability(), 1e-5) + assertEquals(1.0, send.availability(), 1e-5) + } + + @Test + fun grantLease() { + Completable.timer(100, TimeUnit.MILLISECONDS) + .andThen(Completable.defer { + leaseGranterConnection + .grantLease(2, 1, ByteBuffer.allocateDirect(0)) + }) + .subscribe() + val f = sender.firstOrError().blockingGet() + + assertNotNull(f) + assertTrue(f.type === FrameType.LEASE) + assertEquals(2, Frame.Lease.numberOfRequests(f)) + assertEquals(1, Frame.Lease.ttl(f)) + } +} diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt new file mode 100644 index 000000000..3f5581664 --- /dev/null +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt @@ -0,0 +1,65 @@ +package io.rsocket.kotlin.internal.lease + +import io.reactivex.Completable +import io.rsocket.kotlin.exceptions.MissingLeaseException +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test +import java.util.concurrent.TimeUnit + +class LeaseManagerTest { + + private lateinit var leaseManager: LeaseManager + + @Before + fun setUp() { + leaseManager = LeaseManager("") + } + + @Test + fun initialLeaseAvailability() { + assertEquals(0.0, leaseManager.availability(), 1e-5) + } + + @Test + fun useNoRequests() { + val result = leaseManager.useLease() + assertTrue(result is Error) + result as Error + assertTrue(result.ex is MissingLeaseException) + } + + @Test + fun grant() { + leaseManager.grantLease(2, 1_000) + assertEquals(1.0, leaseManager.availability(), 1e-5) + } + + @Test(expected = IllegalArgumentException::class) + fun grantLeaseZeroRequests() { + leaseManager.grantLease(0, 1_000) + } + + @Test(expected = IllegalArgumentException::class) + fun grantLeaseZeroTtl() { + leaseManager.grantLease(1, 0) + } + + @Test + fun use() { + leaseManager.grantLease(2, 1_000) + leaseManager.useLease() + assertEquals(0.5, leaseManager.availability(), 1e-5) + } + + @Test + fun useTimeout() { + leaseManager.grantLease(2, 1_000) + Completable.timer(1500, TimeUnit.MILLISECONDS).blockingAwait() + val result = leaseManager.useLease() + assertTrue(result is Error) + result as Error + assertTrue(result.ex is MissingLeaseException) + } +} diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt new file mode 100644 index 000000000..250365eb2 --- /dev/null +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt @@ -0,0 +1,88 @@ +package io.rsocket.kotlin.internal.lease + +import io.reactivex.Completable +import io.reactivex.Flowable +import io.reactivex.Single +import io.rsocket.kotlin.DefaultPayload +import io.rsocket.kotlin.Payload +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.exceptions.MissingLeaseException +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test +import org.reactivestreams.Publisher + +class LeaseRSocketTest { + + private lateinit var leaseRSocket: LeaseRSocket + private lateinit var rSocket: MockRSocket + private lateinit var leaseManager: LeaseManager + + @Before + fun setUp() { + rSocket = MockRSocket() + leaseManager = LeaseManager("") + val leaseEnabled = LeaseContext() + leaseRSocket = LeaseRSocket(leaseEnabled, rSocket, "", leaseManager) + } + + @Test + fun grantedLease() { + leaseManager.grantLease(2, 1_000) + assertEquals(1.0, leaseRSocket.availability(), 1e-5) + } + + @Test + fun usedLease() { + leaseManager.grantLease(2, 1_000) + leaseRSocket.fireAndForget(DefaultPayload("test")).subscribe() + assertEquals(0.5, leaseRSocket.availability(), 1e-5) + } + + @Test + fun depletedLease() { + leaseManager.grantLease(1, 1_000) + val fireAndForget = leaseRSocket.fireAndForget(DefaultPayload("test")) + val firstErr = fireAndForget.blockingGet() + assertTrue(firstErr == null) + val secondErr = fireAndForget.blockingGet() + assertTrue(secondErr is MissingLeaseException) + } + + @Test + fun connectionNotAvailable() { + leaseManager.grantLease(1, 1_000) + rSocket.setAvailability(0.0f) + assertEquals(0.0, leaseRSocket.availability(), 1e-5) + } + + private class MockRSocket : RSocket { + private var availability = 1.0f + + fun setAvailability(availability: Float) { + this.availability = availability + } + + override fun availability(): Double = availability.toDouble() + + override fun fireAndForget(payload: Payload): Completable = + Completable.complete() + + override fun requestResponse(payload: Payload): Single = + Single.just(payload) + + override fun requestStream(payload: Payload): Flowable = + Flowable.just(payload) + + override fun requestChannel(payloads: Publisher): Flowable = + Flowable.fromPublisher(payloads) + + override fun metadataPush(payload: Payload): Completable = + Completable.complete() + + override fun close(): Completable = Completable.complete() + + override fun onClose(): Completable = Completable.complete() + } +} \ No newline at end of file diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt new file mode 100644 index 000000000..238149ed9 --- /dev/null +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt @@ -0,0 +1,118 @@ +package io.rsocket.kotlin.test.lease + +import io.reactivex.Single +import io.reactivex.processors.BehaviorProcessor +import io.reactivex.subscribers.TestSubscriber +import io.rsocket.kotlin.* +import io.rsocket.kotlin.exceptions.MissingLeaseException +import io.rsocket.kotlin.transport.netty.client.TcpClientTransport +import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable +import io.rsocket.kotlin.transport.netty.server.TcpServerTransport +import io.rsocket.kotlin.util.AbstractRSocket +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test +import java.util.concurrent.TimeUnit + +class LeaseServerTest { + private lateinit var nettyContextCloseable: NettyContextCloseable + private lateinit var serverLease: LeaseRefs + private lateinit var clientSocket: RSocket + private lateinit var leaseRef: LeaseRef + @Before + fun setUp() { + serverLease = LeaseRefs() + nettyContextCloseable = RSocketFactory.receive() + .enableLease(serverLease) + .acceptor { + { _, _ -> + Single.just( + object : AbstractRSocket() { + override fun requestResponse(payload: Payload) + : Single = + Single.just(payload) + }) + } + } + .transport(TcpServerTransport.create("localhost", 0)) + .start() + .blockingGet() + + val address = nettyContextCloseable.address() + clientSocket = RSocketFactory + .connect() + .enableLease(LeaseRefs()) + .keepAlive { opts -> + opts.keepAliveInterval(Duration.ofMinutes(1)) + .keepAliveMaxLifeTime(Duration.ofMinutes(20)) + } + .transport(TcpClientTransport.create(address)) + .start() + .blockingGet() + + leaseRef = serverLease.leaseRef().blockingGet() + } + + @After + fun tearDown() { + clientSocket.close().subscribe() + nettyContextCloseable.close().subscribe() + nettyContextCloseable.onClose().blockingAwait() + } + + @Test + fun grantLeaseNumberOfRequests() { + assertEquals(clientSocket.availability(), 0.0, 1e-5) + leaseRef.grantLease(2, 10_000) + .delay(100, TimeUnit.MILLISECONDS) + .blockingAwait() + assertEquals(clientSocket.availability(), 1.0, 1e-5) + clientSocket.requestResponse(payload()) + .blockingGet() + assertEquals(clientSocket.availability(), 0.5, 1e-5) + clientSocket.requestResponse(payload()) + .blockingGet() + assertEquals(clientSocket.availability(), 0.0, 1e-5) + + val subscriber = TestSubscriber() + clientSocket.requestResponse(payload()) + .toFlowable() + .blockingSubscribe(subscriber) + assertEquals(1, subscriber.errorCount()) + assertTrue(subscriber.errors().first() is MissingLeaseException) + leaseRef.grantLease(1, 10_000) + .delay(100, TimeUnit.MILLISECONDS) + .blockingAwait() + assertEquals(1.0, clientSocket.availability(), 1e-5) + } + + @Test + fun grantLeaseTtl() { + leaseRef.grantLease(2, 200) + .delay(250, TimeUnit.MILLISECONDS) + .blockingAwait() + + assertEquals(clientSocket.availability(), 0.0, 1e-5) + val subscriber = TestSubscriber() + clientSocket.requestResponse(payload()).toFlowable() + .blockingSubscribe(subscriber) + + assertEquals(1, subscriber.errorCount()) + assertTrue(subscriber.errors().first() is MissingLeaseException) + } + + private fun payload() = DefaultPayload("data") + + private class LeaseRefs : (LeaseRef) -> Unit { + private val leaseRefs = BehaviorProcessor.create() + + fun leaseRef(): Single = leaseRefs.firstOrError() + + override fun invoke(leaseRef: LeaseRef) { + leaseRefs.onNext(leaseRef) + } + } + +} \ No newline at end of file diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt new file mode 100644 index 000000000..74e650e0d --- /dev/null +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt @@ -0,0 +1,86 @@ +package io.rsocket.kotlin.test.lease.example + +import io.reactivex.Flowable +import io.reactivex.Single +import io.reactivex.processors.BehaviorProcessor +import io.reactivex.schedulers.Schedulers +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.netty.client.TcpClientTransport +import io.rsocket.kotlin.transport.netty.server.TcpServerTransport +import io.rsocket.kotlin.util.AbstractRSocket +import org.slf4j.LoggerFactory +import java.util.* +import java.util.concurrent.TimeUnit + +object LeaseClientServerExample { + private val LOGGER = LoggerFactory.getLogger(LeaseClientServerExample::class.java) + + @JvmStatic + fun main(args: Array) { + + val serverLease = LeaseRefs() + val nettyContextCloseable = RSocketFactory.receive() + .enableLease(serverLease) + .acceptor { + { _, _ -> + Single.just( + object : AbstractRSocket() { + override fun requestResponse(payload: Payload) + : Single = + Single.just(DefaultPayload("Server Response ${Date()}")) + }) + } + } + .transport(TcpServerTransport.create("localhost", 0)) + .start() + .blockingGet() + + val address = nettyContextCloseable.address() + val clientSocket = RSocketFactory.connect() + .enableLease(LeaseRefs()) + .keepAlive { opts -> + opts.keepAliveInterval(Duration.ofMinutes(1)) + .keepAliveMaxLifeTime(Duration.ofMinutes(20)) + } + .transport(TcpClientTransport.create(address)) + .start() + .blockingGet() + + Flowable.interval(1, TimeUnit.SECONDS) + .observeOn(Schedulers.io()) + .flatMap { + LOGGER.info("Availability: ${clientSocket.availability()}") + clientSocket + .requestResponse(DefaultPayload("Client request ${Date()}")) + .toFlowable() + .doOnError { LOGGER.info("Error: $it") } + .onErrorResumeNext { _: Throwable -> + Flowable.empty() + } + } + .subscribe { resp -> LOGGER.info("Client response: ${resp.dataUtf8}") } + + serverLease + .leaseRef() + .flatMapCompletable { connRef -> + Flowable.interval(1, 10, TimeUnit.SECONDS) + .flatMapCompletable { _ -> + connRef.grantLease( + numberOfRequests = 7, + timeToLiveMillis = 5_000) + } + }.subscribe() + + clientSocket.onClose().blockingAwait() + } + + private class LeaseRefs : (LeaseRef) -> Unit { + private val leaseRefs = BehaviorProcessor.create() + + fun leaseRef(): Single = leaseRefs.firstOrError() + + override fun invoke(leaseRef: LeaseRef) { + leaseRefs.onNext(leaseRef) + } + } +} diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/ClientServerChannelTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt similarity index 98% rename from test/src/test/kotlin/io/rsocket/kotlin/test/ClientServerChannelTest.kt rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt index 52fcf9e23..aab013e1b 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/ClientServerChannelTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt @@ -1,4 +1,4 @@ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.test.transport import io.reactivex.Flowable import io.reactivex.Single diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/EndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt similarity index 99% rename from test/src/test/kotlin/io/rsocket/kotlin/test/EndToEndTest.kt rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt index 292fa4c2b..7489152f7 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/EndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt @@ -1,4 +1,4 @@ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.test.transport import io.reactivex.Completable import io.reactivex.Flowable diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyTcpEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt similarity index 88% rename from test/src/test/kotlin/io/rsocket/kotlin/test/NettyTcpEndToEndTest.kt rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt index a9834b372..31632083f 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyTcpEndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt @@ -1,4 +1,4 @@ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.test.transport import io.rsocket.kotlin.transport.netty.client.TcpClientTransport import io.rsocket.kotlin.transport.netty.server.TcpServerTransport diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyWebsocketEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt similarity index 83% rename from test/src/test/kotlin/io/rsocket/kotlin/test/NettyWebsocketEndToEndTest.kt rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt index eb1e1f2d8..cf9f1af98 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyWebsocketEndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt @@ -1,5 +1,6 @@ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.test.transport +import io.rsocket.kotlin.test.transport.EndToEndTest import io.rsocket.kotlin.transport.netty.client.WebsocketClientTransport import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/OkHttpNettyEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt similarity index 85% rename from test/src/test/kotlin/io/rsocket/kotlin/test/OkHttpNettyEndToEndTest.kt rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt index 33dd9c8db..c1817e12a 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/OkHttpNettyEndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt @@ -1,5 +1,6 @@ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.test.transport +import io.rsocket.kotlin.test.transport.EndToEndTest import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport import io.rsocket.transport.okhttp.client.OkhttpWebsocketClientTransport import okhttp3.HttpUrl From d27595f7ef80741b586f50434f674717ee5c6260 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 22 May 2018 21:45:05 +0300 Subject: [PATCH 2/2] fix fragmentation leaks --- .../internal/fragmentation/FrameFragmenter.kt | 3 +-- .../fragmentation/StreamFramesReassembler.kt | 13 ++++++------- .../fragmentation/StreamFramesReassemblerTest.kt | 2 ++ .../test/transport/ClientServerChannelTest.kt | 5 ++--- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt index 4d1d13c4a..e42281547 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt @@ -75,9 +75,8 @@ internal class FrameFragmenter(private val mtu: Int) { } } - private class State(frame: Frame) : Disposable { + private class State(private val frame: Frame) : Disposable { private val disposed = AtomicBoolean() - private val frame: Frame = frame.retain() private val data: ByteBuf = sliceFrameData(frame.content()) private val metadata: ByteBuf? = if (frame.hasMetadata()) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt index 9133cdf4c..24750f39d 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean internal class StreamFramesReassembler(frame: Frame) : Disposable { private val isDisposed = AtomicBoolean() - private val blueprintFrame = frame.retain() + private val blueprintFrame = frame private val dataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer() private val metadataBuffer = if (frame.hasMetadata()) @@ -43,23 +43,23 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable { frameType, frameLength) ?: 0 val dataLength = FrameHeaderFlyweight.dataLength(byteBuf, frameType) - if (0 < metadataLength) { + if (metadataLength > 0) { var metadataOffset = FrameHeaderFlyweight.metadataOffset(byteBuf) if (FrameHeaderFlyweight.hasMetadataLengthField(frameType)) { metadataOffset += FrameHeaderFlyweight.FRAME_LENGTH_SIZE } metadataBuffer!!.addComponent( true, - byteBuf.retainedSlice(metadataOffset, metadataLength)) + byteBuf.slice(metadataOffset, metadataLength)) } - if (0 < dataLength) { + if (dataLength > 0) { val dataOffset = FrameHeaderFlyweight.dataOffset( byteBuf, frameType, frameLength) dataBuffer.addComponent( true, - byteBuf.retainedSlice(dataOffset, dataLength)) + byteBuf.slice(dataOffset, dataLength)) } return this } @@ -69,13 +69,12 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable { blueprintFrame, metadataBuffer, dataBuffer) - blueprintFrame.release() + dispose() return assembled } override fun dispose() { if (isDisposed.compareAndSet(false, true)) { - blueprintFrame.release() dataBuffer.release() metadataBuffer?.release() } diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt index 4fe467ad4..7dd7562f3 100644 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt @@ -33,6 +33,7 @@ class StreamFramesReassemblerTest { val from = Frame.Request.from( 1024, FrameType.REQUEST_RESPONSE, DefaultPayload(data, metadata), 1) + .retain() val frameFragmenter = FrameFragmenter(2) val frameReassembler = StreamFramesReassembler(from) frameFragmenter.fragment(from) @@ -64,6 +65,7 @@ class StreamFramesReassemblerTest { val from = Frame.Request.from( 1024, FrameType.REQUEST_RESPONSE, DefaultPayload(data, metadata), 1) + .retain() val frameFragmenter = FrameFragmenter(2) val frameReassembler = StreamFramesReassembler(from) frameFragmenter.fragment(from) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt index aab013e1b..c79482b8a 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt @@ -2,14 +2,14 @@ package io.rsocket.kotlin.test.transport import io.reactivex.Flowable import io.reactivex.Single -import io.rsocket.kotlin.util.AbstractRSocket +import io.rsocket.kotlin.DefaultPayload import io.rsocket.kotlin.Payload import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.RSocketFactory import io.rsocket.kotlin.transport.netty.client.TcpClientTransport import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable import io.rsocket.kotlin.transport.netty.server.TcpServerTransport -import io.rsocket.kotlin.DefaultPayload +import io.rsocket.kotlin.util.AbstractRSocket import org.junit.Before import org.junit.Test import org.reactivestreams.Publisher @@ -26,7 +26,6 @@ class ClientServerChannelTest { val address = InetSocketAddress .createUnresolved("localhost", 0) val serverTransport = TcpServerTransport.create(address) - channelHandler = ChannelHandler(intervalMillis) server = RSocketFactory .receive()