diff --git a/build.gradle b/build.gradle
index aa262e931..c2f2263f8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -109,11 +109,6 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey')) {
gpg {
sign = true
}
-
- mavenCentralSync {
- user = project.property('sonatypeUsername')
- password = project.property('sonatypePassword')
- }
}
}
}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt
index 5921cce41..8a51688a4 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt
@@ -304,9 +304,11 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
fun leaseEnabled(frame: Frame): Boolean {
ensureFrameType(FrameType.SETUP, frame)
- return Frame.isFlagSet(
- frame.flags(),
- SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE)
+ return SetupFrameFlyweight.supportsLease(frame.flags())
+ }
+
+ fun enableLease(flags: Int): Int {
+ return flags or SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE
}
fun keepaliveInterval(frame: Frame): Int {
@@ -594,7 +596,7 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
object Fragmentation {
fun assembleFrame(blueprintFrame: Frame,
- metadata: ByteBuf,
+ metadata: ByteBuf?,
data: ByteBuf): Frame =
create(blueprintFrame,
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt
index ab92825d3..8115cad7d 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt
@@ -20,6 +20,10 @@ class KeepAliveOptions : KeepAlive {
override fun keepAliveMaxLifeTime() = maxLifeTime
+ fun copy(): KeepAliveOptions = KeepAliveOptions()
+ .keepAliveInterval(interval)
+ .keepAliveMaxLifeTime(maxLifeTime)
+
private fun assertDuration(duration: Duration, name: String) {
if (duration.millis <= 0) {
throw IllegalArgumentException("$name must be positive")
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt
new file mode 100644
index 000000000..e97d63673
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt
@@ -0,0 +1,18 @@
+package io.rsocket.kotlin
+
+import io.reactivex.Completable
+import java.nio.ByteBuffer
+
+/** Provides means to grant lease to peer */
+interface LeaseRef {
+
+ fun grantLease(
+ numberOfRequests: Int,
+ ttlMillis: Long,
+ metadata: ByteBuffer): Completable
+
+ fun grantLease(numberOfRequests: Int,
+ timeToLiveMillis: Long): Completable
+
+ fun onClose(): Completable
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt
index 01a6d7bab..8fd0637ea 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/MediaTypeOptions.kt
@@ -20,6 +20,10 @@ class MediaTypeOptions : MediaType {
override fun metadataMimeType(): String = metadataMimeType
+ fun copy(): MediaTypeOptions = MediaTypeOptions()
+ .dataMimeType(dataMimeType)
+ .metadataMimeType(metadataMimeType)
+
private fun assertMediaType(mediaType: String) {
if (mediaType.isEmpty()) {
throw IllegalArgumentException("media type must be non-empty")
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt
index 4a682886e..ffaac747d 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt
@@ -21,6 +21,8 @@ import io.reactivex.Single
import io.rsocket.kotlin.interceptors.GlobalInterceptors
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.fragmentation.FragmentationInterceptor
+import io.rsocket.kotlin.internal.lease.ClientLeaseSupport
+import io.rsocket.kotlin.internal.lease.ServerLeaseSupport
import io.rsocket.kotlin.transport.ClientTransport
import io.rsocket.kotlin.transport.ServerTransport
import io.rsocket.kotlin.util.AbstractRSocket
@@ -47,6 +49,7 @@ object RSocketFactory {
private var acceptor: ClientAcceptor = { { emptyRSocket } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
+ private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
private val interceptors = GlobalInterceptors.create()
private var flags = 0
private var setupPayload: Payload = DefaultPayload.EMPTY
@@ -76,6 +79,12 @@ object RSocketFactory {
return this
}
+ fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
+ this.flags = Frame.Setup.enableLease(flags)
+ this.leaseRefConsumer = leaseRefConsumer
+ return this
+ }
+
fun errorConsumer(errorConsumer: (Throwable) -> Unit): ClientRSocketFactory {
this.errorConsumer = errorConsumer
return this
@@ -93,36 +102,65 @@ object RSocketFactory {
}
fun transport(transport: () -> ClientTransport): Start =
- ClientStart(transport, interceptors())
+ clientStart(acceptor, transport)
+
+ fun transport(transport: ClientTransport): Start =
+ transport { transport }
fun acceptor(acceptor: ClientAcceptor): ClientTransportAcceptor {
this.acceptor = acceptor
return object : ClientTransportAcceptor {
override fun transport(transport: () -> ClientTransport)
- : Start =
- ClientStart(transport, interceptors())
+ : Start = clientStart(acceptor, transport)
}
}
- private fun interceptors(): InterceptorRegistry =
- interceptors.copyWith {
- it.connectionFirst(
- FragmentationInterceptor(mtu))
- }
+ private fun clientStart(acceptor: ClientAcceptor,
+ transport: () -> ClientTransport): ClientStart =
- private inner class ClientStart(private val transportClient: () -> ClientTransport,
- private val interceptors: InterceptorRegistry)
+ ClientStart(acceptor,
+ errorConsumer,
+ mtu,
+ leaseRefConsumer,
+ flags,
+ setupPayload,
+ keepAlive.copy(),
+ mediaType.copy(),
+ streamRequestLimit,
+ transport,
+ interceptors.copy())
+
+ private class ClientStart(
+ private val acceptor: ClientAcceptor,
+ private val errorConsumer: (Throwable) -> Unit,
+ private var mtu: Int,
+ private val leaseRef: ((LeaseRef) -> Unit)?,
+ private val flags: Int,
+ private val setupPayload: Payload,
+ private val keepAlive: KeepAlive,
+ private val mediaType: MediaType,
+ private val streamRequestLimit: Int,
+ private val transportClient: () -> ClientTransport,
+ private val parentInterceptors: InterceptorRegistry)
: Start {
override fun start(): Single {
return transportClient()
.connect()
.flatMap { connection ->
- val setupFrame = createSetupFrame()
+
+ val withLease =
+ enableLease(parentInterceptors)
+
+ val interceptors =
+ enableFragmentation(withLease)
+
+ val interceptConnection = interceptors as InterceptConnection
+ val interceptRSocket = interceptors as InterceptRSocket
val demuxer = ClientConnectionDemuxer(
connection,
- interceptors)
+ interceptConnection)
val rSocketRequester = RSocketRequester(
demuxer.requesterConnection(),
@@ -130,12 +168,12 @@ object RSocketFactory {
ClientStreamIds(),
streamRequestLimit)
- val wrappedRequester = interceptors
+ val wrappedRequester = interceptRSocket
.interceptRequester(rSocketRequester)
val handlerRSocket = acceptor()(wrappedRequester)
- val wrappedHandler = interceptors
+ val wrappedHandler = interceptRSocket
.interceptHandler(handlerRSocket)
RSocketResponder(
@@ -149,12 +187,21 @@ object RSocketFactory {
keepAlive,
errorConsumer)
+ val setupFrame = createSetupFrame()
+
connection
.sendOne(setupFrame)
.andThen(Single.just(wrappedRequester))
}
}
+ private fun enableFragmentation(parentInterceptors: InterceptorRegistry)
+ : InterceptorRegistry {
+ parentInterceptors.connectionFirst(
+ FragmentationInterceptor(mtu))
+ return parentInterceptors
+ }
+
private fun createSetupFrame(): Frame {
return Frame.Setup.from(
flags,
@@ -164,14 +211,23 @@ object RSocketFactory {
mediaType.dataMimeType(),
setupPayload)
}
+
+ private fun enableLease(parentInterceptors: InterceptorRegistry)
+ : InterceptorRegistry =
+ if (leaseRef != null) {
+ parentInterceptors.copyWith(
+ ClientLeaseSupport.enable(leaseRef)())
+ } else {
+ parentInterceptors.copy()
+ }
}
}
class ServerRSocketFactory internal constructor() {
- private var acceptor: ServerAcceptor = { { _, _ -> Single.just(emptyRSocket) } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
+ private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
private val interceptors = GlobalInterceptors.create()
private var streamRequestLimit = defaultStreamRequestLimit
@@ -186,6 +242,11 @@ object RSocketFactory {
return this
}
+ fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
+ this.leaseRefConsumer = leaseRefConsumer
+ return this
+ }
+
fun errorConsumer(errorConsumer: (Throwable) -> Unit): ServerRSocketFactory {
this.errorConsumer = errorConsumer
return this
@@ -197,26 +258,28 @@ object RSocketFactory {
}
fun acceptor(acceptor: ServerAcceptor): ServerTransportAcceptor {
- this.acceptor = acceptor
return object : ServerTransportAcceptor {
+
override fun transport(
transport: () -> ServerTransport): Start =
- ServerStart(transport, interceptors())
- }
- }
-
- private fun interceptors(): InterceptorRegistry {
- return interceptors.copyWith {
- it.connectionFirst(
- ServerContractInterceptor(errorConsumer))
- it.connectionFirst(
- FragmentationInterceptor(mtu))
+ ServerStart(transport,
+ acceptor,
+ errorConsumer,
+ mtu,
+ leaseRefConsumer,
+ interceptors.copy(),
+ streamRequestLimit)
}
}
- private inner class ServerStart(
+ private class ServerStart(
private val transportServer: () -> ServerTransport,
- private val interceptors: InterceptorRegistry) : Start {
+ private val acceptor: ServerAcceptor,
+ private val errorConsumer: (Throwable) -> Unit,
+ private val mtu: Int,
+ private val leaseRef: ((LeaseRef) -> Unit)?,
+ private val parentInterceptors: InterceptorRegistry,
+ private val streamRequestLimit: Int) : Start {
override fun start(): Single {
return transportServer().start(object
@@ -225,25 +288,37 @@ object RSocketFactory {
override fun invoke(duplexConnection: DuplexConnection)
: Completable {
+ val withLease =
+ enableLease(parentInterceptors)
+
+ val withServerContract =
+ enableServerContract(withLease)
+
+ val interceptors =
+ enableFragmentation(withServerContract)
+
val demuxer = ServerConnectionDemuxer(
duplexConnection,
- interceptors)
+ interceptors as InterceptConnection)
return demuxer
.setupConnection()
.receive()
.firstOrError()
.flatMapCompletable { setup ->
- accept(setup, demuxer)
+ accept(setup,
+ interceptors as InterceptRSocket,
+ demuxer)
}
}
})
}
private fun accept(setupFrame: Frame,
+ interceptors: InterceptRSocket,
demuxer: ConnectionDemuxer): Completable {
- val setup = Setup.create(setupFrame)
+ val setup = SetupContents.create(setupFrame)
val rSocketRequester = RSocketRequester(
demuxer.requesterConnection(),
@@ -272,6 +347,30 @@ object RSocketFactory {
}
.ignoreElement()
}
+
+ private fun enableLease(parentInterceptors: InterceptorRegistry)
+ : InterceptorRegistry =
+ if (leaseRef != null) {
+ parentInterceptors.copyWith(
+ ServerLeaseSupport.enable(leaseRef)())
+ } else {
+ parentInterceptors.copy()
+ }
+
+ private fun enableServerContract(parentInterceptors: InterceptorRegistry)
+ : InterceptorRegistry {
+
+ parentInterceptors.connectionFirst(
+ ServerContractInterceptor(errorConsumer, leaseRef != null))
+ return parentInterceptors
+ }
+
+ private fun enableFragmentation(parentInterceptors: InterceptorRegistry)
+ : InterceptorRegistry {
+ parentInterceptors.connectionFirst(
+ FragmentationInterceptor(mtu))
+ return parentInterceptors
+ }
}
}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt
index 291946a54..f9437cc6e 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Setup.kt
@@ -15,71 +15,13 @@
*/
package io.rsocket.kotlin
-import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_M
-
-import io.rsocket.kotlin.internal.frame.SetupFrameFlyweight
-import java.nio.ByteBuffer
-
/**
* Exposed to server for determination of RequestHandler based on mime types
* and SETUP metadata/data
*/
-abstract class Setup : Payload, KeepAlive {
-
- abstract fun metadataMimeType(): String
-
- abstract fun dataMimeType(): String
-
- abstract val flags: Int
-
- fun willClientHonorLease(): Boolean = Frame.isFlagSet(flags, HONOR_LEASE)
-
- override fun hasMetadata(): Boolean = Frame.isFlagSet(flags, FLAGS_M)
-
- private class SetupImpl(
- private val metadataMimeType: String,
- private val dataMimeType: String,
- override val data: ByteBuffer,
- override val metadata: ByteBuffer,
- private val keepAliveInterval: Int,
- private val keepAliveLifetime: Int,
- override val flags: Int) : Setup() {
-
- init {
- if (!hasMetadata() && metadata.remaining() > 0) {
- throw IllegalArgumentException("metadata flag incorrect")
- }
- }
-
- override fun keepAliveInterval(): Duration =
- Duration.ofMillis(keepAliveInterval)
-
- override fun keepAliveMaxLifeTime(): Duration =
- Duration.ofMillis(keepAliveLifetime)
-
- override fun metadataMimeType(): String = metadataMimeType
-
- override fun dataMimeType(): String = dataMimeType
- }
-
- companion object {
+interface Setup : Payload {
- private const val HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE
+ fun metadataMimeType(): String
- internal fun create(setupFrame: Frame): Setup {
- Frame.ensureFrameType(FrameType.SETUP, setupFrame)
- return try {
- SetupImpl(
- Frame.Setup.metadataMimeType(setupFrame),
- Frame.Setup.dataMimeType(setupFrame),
- setupFrame.data,
- setupFrame.metadata,
- Frame.Setup.keepaliveInterval(setupFrame),
- Frame.Setup.maxLifetime(setupFrame),
- Frame.Setup.getFlags(setupFrame))
- } finally {
- setupFrame.release()
- }
- }
- }
+ fun dataMimeType(): String
}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt
new file mode 100644
index 000000000..cff417969
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt
@@ -0,0 +1,18 @@
+package io.rsocket.kotlin.exceptions
+
+import io.rsocket.kotlin.internal.lease.Lease
+
+class MissingLeaseException(lease: Lease, tag: String)
+ : RejectedException(leaseMessage(lease, tag)) {
+
+ override fun fillInStackTrace(): Throwable = this
+
+ companion object {
+ internal fun leaseMessage(lease: Lease, tag: String): String {
+ val expired = lease.isExpired
+ val allowedRequests = lease.allowedRequests
+ return "$tag: Missing lease. " +
+ "Expired: $expired, allowedRequests: $allowedRequests"
+ }
+ }
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt
index 42e23f82c..6014650c4 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/RejectedException.kt
@@ -17,7 +17,7 @@ package io.rsocket.kotlin.exceptions
import io.rsocket.kotlin.internal.frame.ErrorFrameFlyweight
-class RejectedException : RSocketException, Retryable {
+open class RejectedException : RSocketException, Retryable {
constructor(message: String) : super(message)
constructor(message: String, cause: Throwable) : super(message, cause)
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt
index 3a0384972..92dfd6759 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ConnectionDemuxer.kt
@@ -29,8 +29,8 @@ import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
internal class ServerConnectionDemuxer(source: DuplexConnection,
- plugins: InterceptorRegistry)
- : ConnectionDemuxer(source, plugins) {
+ interceptors: InterceptConnection)
+ : ConnectionDemuxer(source, interceptors) {
override fun demux(frame: Frame): Type {
val streamId = frame.streamId
@@ -45,8 +45,8 @@ internal class ServerConnectionDemuxer(source: DuplexConnection,
}
internal class ClientConnectionDemuxer(source: DuplexConnection,
- plugins: InterceptorRegistry)
- : ConnectionDemuxer(source, plugins) {
+ interceptors: InterceptConnection)
+ : ConnectionDemuxer(source, interceptors) {
override fun demux(frame: Frame): Type {
val streamId = frame.streamId
@@ -61,27 +61,26 @@ internal class ClientConnectionDemuxer(source: DuplexConnection,
}
sealed class ConnectionDemuxer(private val source: DuplexConnection,
- plugins: InterceptorRegistry) {
-
+ interceptors: InterceptConnection) {
private val setupConnection: DuplexConnection
private val responderConnection: DuplexConnection
private val requesterConnection: DuplexConnection
private val serviceConnection: DuplexConnection
init {
- val src = plugins.interceptConnection(ALL, source)
+ val src = interceptors.interceptConnection(ALL, source)
val setupConn = DemuxedConnection(src)
- setupConnection = plugins.interceptConnection(Type.SETUP, setupConn)
+ setupConnection = interceptors.interceptConnection(Type.SETUP, setupConn)
val requesterConn = DemuxedConnection(src)
- requesterConnection = plugins.interceptConnection(REQUESTER, requesterConn)
+ requesterConnection = interceptors.interceptConnection(REQUESTER, requesterConn)
val responderConn = DemuxedConnection(src)
- responderConnection = plugins.interceptConnection(RESPONDER, responderConn)
+ responderConnection = interceptors.interceptConnection(RESPONDER, responderConn)
val serviceConn = DemuxedConnection(src)
- serviceConnection = plugins.interceptConnection(SERVICE, serviceConn)
+ serviceConnection = interceptors.interceptConnection(SERVICE, serviceConn)
src.receive()
.groupBy(::demux)
@@ -126,7 +125,8 @@ sealed class ConnectionDemuxer(private val source: DuplexConnection,
val frames = if (debugEnabled) {
Flowable.fromPublisher(frame)
.doOnNext { f ->
- LOGGER.debug("sending -> " + f.toString()) }
+ LOGGER.debug("sending -> " + f.toString())
+ }
} else {
frame
}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt
index 8e3df235d..1257e1d08 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/InterceptorRegistry.kt
@@ -17,13 +17,16 @@
package io.rsocket.kotlin.internal
import io.rsocket.kotlin.DuplexConnection
+import io.rsocket.kotlin.InterceptorOptions
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor
-import io.rsocket.kotlin.InterceptorOptions
import io.rsocket.kotlin.interceptors.RSocketInterceptor
-import java.util.ArrayList
+import java.util.*
-internal class InterceptorRegistry : InterceptorOptions {
+internal class InterceptorRegistry :
+ InterceptorOptions,
+ InterceptConnection,
+ InterceptRSocket {
private val connections = ArrayList()
private val requesters = ArrayList()
private val handlers = ArrayList()
@@ -36,9 +39,13 @@ internal class InterceptorRegistry : InterceptorOptions {
this.handlers.addAll(interceptorRegistry.handlers)
}
- fun copyWith(action: (InterceptorRegistry) -> Unit): InterceptorRegistry {
- val copy = InterceptorRegistry(this)
- action(copy)
+ fun copy(): InterceptorRegistry = InterceptorRegistry(this)
+
+ fun copyWith(registry: InterceptorRegistry): InterceptorRegistry {
+ val copy = copy()
+ copy.connections.addAll(registry.connections)
+ copy.requesters.addAll(registry.requesters)
+ copy.handlers.addAll(registry.handlers)
return copy
}
@@ -58,7 +65,7 @@ internal class InterceptorRegistry : InterceptorOptions {
handlers.add(interceptor)
}
- fun interceptRequester(rSocket: RSocket): RSocket {
+ override fun interceptRequester(rSocket: RSocket): RSocket {
var rs = rSocket
for (interceptor in requesters) {
rs = interceptor(rs)
@@ -66,7 +73,7 @@ internal class InterceptorRegistry : InterceptorOptions {
return rs
}
- fun interceptHandler(rSocket: RSocket): RSocket {
+ override fun interceptHandler(rSocket: RSocket): RSocket {
var rs = rSocket
for (interceptor in handlers) {
rs = interceptor(rs)
@@ -74,7 +81,7 @@ internal class InterceptorRegistry : InterceptorOptions {
return rs
}
- fun interceptConnection(
+ override fun interceptConnection(
type: DuplexConnectionInterceptor.Type,
connection: DuplexConnection): DuplexConnection {
var conn = connection
@@ -84,3 +91,19 @@ internal class InterceptorRegistry : InterceptorOptions {
return conn
}
}
+
+internal interface InterceptConnection {
+
+ fun interceptConnection(
+ type: DuplexConnectionInterceptor.Type,
+ connection: DuplexConnection): DuplexConnection
+}
+
+internal interface InterceptRSocket {
+
+ fun interceptRequester(rSocket: RSocket): RSocket
+
+ fun interceptHandler(rSocket: RSocket): RSocket
+}
+
+
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt
index 8fc867cb2..b6c2326ff 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerContractInterceptor.kt
@@ -9,9 +9,9 @@ import io.rsocket.kotlin.exceptions.InvalidSetupException
import io.rsocket.kotlin.exceptions.RSocketException
import io.rsocket.kotlin.exceptions.RejectedResumeException
import io.rsocket.kotlin.exceptions.RejectedSetupException
-import io.rsocket.kotlin.internal.frame.SetupFrameFlyweight
import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor
import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor.Type
+import io.rsocket.kotlin.internal.frame.SetupFrameFlyweight
import io.rsocket.kotlin.util.DuplexConnectionProxy
internal class ServerContractInterceptor(
@@ -20,10 +20,10 @@ internal class ServerContractInterceptor(
private val leaseEnabled: Boolean,
private val resumeEnabled: Boolean) : DuplexConnectionInterceptor {
- constructor(errorConsumer: (Throwable) -> Unit) :
+ constructor(errorConsumer: (Throwable) -> Unit, leaseEnabled: Boolean) :
this(errorConsumer,
SetupFrameFlyweight.CURRENT_VERSION,
- false,
+ leaseEnabled,
false)
override fun invoke(type: Type, source: DuplexConnection): DuplexConnection =
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt
index 1b71018df..317908d8b 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServiceHandler.kt
@@ -36,8 +36,9 @@ internal abstract class ServiceHandler(private val serviceConnection: DuplexConn
protected abstract fun handleKeepAlive(frame: Frame)
+ @Suppress("UNUSED_PARAMETER")
private fun handleLease(frame: Frame) {
- errorConsumer(IllegalArgumentException("Lease is not supported: $frame"))
+ /*Lease interceptors processed frame already, just release it here*/
}
private fun handleError(frame: Frame) {
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt
new file mode 100644
index 000000000..5aa82095b
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/SetupContents.kt
@@ -0,0 +1,46 @@
+package io.rsocket.kotlin.internal
+
+import io.rsocket.kotlin.*
+import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight
+import java.nio.ByteBuffer
+
+internal class SetupContents(
+ private val metadataMimeType: String,
+ private val dataMimeType: String,
+ override val data: ByteBuffer,
+ override val metadata: ByteBuffer,
+ private val keepAliveInterval: Int,
+ private val keepAliveLifetime: Int,
+ private val flags: Int) : Setup, KeepAlive {
+
+ override fun keepAliveInterval(): Duration =
+ Duration.ofMillis(keepAliveInterval)
+
+ override fun keepAliveMaxLifeTime(): Duration =
+ Duration.ofMillis(keepAliveLifetime)
+
+ override fun metadataMimeType(): String = metadataMimeType
+
+ override fun dataMimeType(): String = dataMimeType
+
+ override fun hasMetadata(): Boolean = Frame.isFlagSet(flags, FrameHeaderFlyweight.FLAGS_M)
+
+ companion object {
+
+ internal fun create(setupFrame: Frame): SetupContents {
+ Frame.ensureFrameType(FrameType.SETUP, setupFrame)
+ return try {
+ SetupContents(
+ Frame.Setup.metadataMimeType(setupFrame),
+ Frame.Setup.dataMimeType(setupFrame),
+ setupFrame.data,
+ setupFrame.metadata,
+ Frame.Setup.keepaliveInterval(setupFrame),
+ Frame.Setup.maxLifetime(setupFrame),
+ Frame.Setup.getFlags(setupFrame))
+ } finally {
+ setupFrame.release()
+ }
+ }
+ }
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt
index 4d1d13c4a..e42281547 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/FrameFragmenter.kt
@@ -75,9 +75,8 @@ internal class FrameFragmenter(private val mtu: Int) {
}
}
- private class State(frame: Frame) : Disposable {
+ private class State(private val frame: Frame) : Disposable {
private val disposed = AtomicBoolean()
- private val frame: Frame = frame.retain()
private val data: ByteBuf = sliceFrameData(frame.content())
private val metadata: ByteBuf? =
if (frame.hasMetadata())
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt
index 860c26a1a..24750f39d 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassembler.kt
@@ -26,9 +26,13 @@ import java.util.concurrent.atomic.AtomicBoolean
internal class StreamFramesReassembler(frame: Frame) : Disposable {
private val isDisposed = AtomicBoolean()
- private val blueprintFrame = frame.retain()
+ private val blueprintFrame = frame
private val dataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer()
- private val metadataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer()
+ private val metadataBuffer =
+ if (frame.hasMetadata())
+ PooledByteBufAllocator.DEFAULT.compositeBuffer()
+ else
+ null
fun append(frame: Frame): StreamFramesReassembler {
val byteBuf = frame.content()
@@ -37,25 +41,25 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable {
val metadataLength = FrameHeaderFlyweight.metadataLength(
byteBuf,
frameType,
- frameLength)!!
+ frameLength) ?: 0
val dataLength = FrameHeaderFlyweight.dataLength(byteBuf, frameType)
- if (0 < metadataLength) {
+ if (metadataLength > 0) {
var metadataOffset = FrameHeaderFlyweight.metadataOffset(byteBuf)
if (FrameHeaderFlyweight.hasMetadataLengthField(frameType)) {
metadataOffset += FrameHeaderFlyweight.FRAME_LENGTH_SIZE
}
- metadataBuffer.addComponent(
+ metadataBuffer!!.addComponent(
true,
- byteBuf.retainedSlice(metadataOffset, metadataLength))
+ byteBuf.slice(metadataOffset, metadataLength))
}
- if (0 < dataLength) {
+ if (dataLength > 0) {
val dataOffset = FrameHeaderFlyweight.dataOffset(
byteBuf,
frameType,
frameLength)
dataBuffer.addComponent(
true,
- byteBuf.retainedSlice(dataOffset, dataLength))
+ byteBuf.slice(dataOffset, dataLength))
}
return this
}
@@ -65,15 +69,14 @@ internal class StreamFramesReassembler(frame: Frame) : Disposable {
blueprintFrame,
metadataBuffer,
dataBuffer)
- blueprintFrame.release()
+ dispose()
return assembled
}
override fun dispose() {
if (isDisposed.compareAndSet(false, true)) {
- blueprintFrame.release()
dataBuffer.release()
- metadataBuffer.release()
+ metadataBuffer?.release()
}
}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt
index 330e81faf..409a252cc 100644
--- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/frame/SetupFrameFlyweight.kt
@@ -169,6 +169,9 @@ internal object SetupFrameFlyweight {
fun keepaliveInterval(byteBuf: ByteBuf): Int =
byteBuf.getInt(KEEPALIVE_INTERVAL_FIELD_OFFSET)
+ fun supportsLease(flags: Int): Boolean =
+ (flags and FLAGS_WILL_HONOR_LEASE) == FLAGS_WILL_HONOR_LEASE
+
fun maxLifetime(byteBuf: ByteBuf): Int =
byteBuf.getInt(MAX_LIFETIME_FIELD_OFFSET)
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt
new file mode 100644
index 000000000..fbb5dbcf1
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.reactivex.Completable
+import io.rsocket.kotlin.LeaseRef
+import java.nio.ByteBuffer
+
+internal class ConnectionLeaseRef(private val leaseGranterConnection
+ : LeaseGranterConnection) : LeaseRef {
+
+ override fun grantLease(numberOfRequests: Int,
+ ttlMillis: Long,
+ metadata: ByteBuffer): Completable {
+ return grant(
+ numberOfRequests,
+ ttlMillis,
+ metadata)
+ }
+
+ override fun grantLease(numberOfRequests: Int,
+ timeToLiveMillis: Long): Completable {
+ return grant(
+ numberOfRequests,
+ timeToLiveMillis,
+ null)
+ }
+
+ override fun onClose(): Completable = leaseGranterConnection.onClose()
+
+ private fun grant(
+ numberOfRequests: Int,
+ ttlMillis: Long,
+ metadata: ByteBuffer?): Completable {
+ assertArgs(numberOfRequests, ttlMillis)
+ val ttl = Math.toIntExact(ttlMillis)
+ return leaseGranterConnection.grantLease(
+ numberOfRequests,
+ ttl,
+ metadata)
+ }
+
+ private fun assertArgs(numberOfRequests: Int, ttl: Long) {
+ if (numberOfRequests <= 0) {
+ throw IllegalArgumentException(
+ "numberOfRequests must be non-negative: $numberOfRequests")
+ }
+ if (ttl <= 0) {
+ throw IllegalArgumentException(
+ "timeToLive must be non-negative: $ttl")
+ }
+ }
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt
new file mode 100644
index 000000000..9253f1b83
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import java.nio.ByteBuffer
+
+/** A contract for RSocket lease, which is sent by a request acceptor and is time bound. */
+interface Lease {
+
+ /**
+ * Number of requests allowed by this lease.
+ *
+ * @return The number of requests allowed by this lease.
+ */
+ val allowedRequests: Int
+
+ /**
+ * Number of milliseconds that this lease is valid from the time it is received.
+ *
+ * @return Number of milliseconds that this lease is valid from the time it is received.
+ */
+ val ttl: Int
+
+ /**
+ * Metadata for the lease.
+ *
+ * @return Metadata for the lease.
+ */
+ val metadata: ByteBuffer?
+
+ /**
+ * Checks if the lease is expired now.
+ *
+ * @return `true` if the lease has expired.
+ */
+ val isExpired: Boolean
+ get() = isExpired(System.currentTimeMillis())
+
+ /** Checks if the lease has not expired and there are allowed requests available */
+ val isValid: Boolean
+ get() = !isExpired && allowedRequests > 0
+
+ /**
+ * Absolute time since epoch at which this lease will expire.
+ *
+ * @return Absolute time since epoch at which this lease will expire.
+ */
+ fun expiry(): Long
+
+ /**
+ * Checks if the lease is expired for the passed `now`.
+ *
+ * @param now current time in millis.
+ * @return `true` if the lease has expired.
+ */
+ fun isExpired(now: Long): Boolean = now >= expiry()
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt
new file mode 100644
index 000000000..978de9120
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+/** State shared by Lease related interceptors */
+internal class LeaseContext(var leaseEnabled: Boolean = true) {
+
+ override fun toString(): String =
+ "LeaseContext{leaseEnabled=$leaseEnabled}"
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt
new file mode 100644
index 000000000..c6dc64765
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.reactivex.Flowable
+import io.rsocket.kotlin.DuplexConnection
+import io.rsocket.kotlin.Frame
+import io.rsocket.kotlin.FrameType
+import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor
+import io.rsocket.kotlin.util.DuplexConnectionProxy
+
+internal class LeaseEnablingInterceptor(private val leaseContext: LeaseContext)
+ : DuplexConnectionInterceptor {
+
+ override fun invoke(type: DuplexConnectionInterceptor.Type,
+ connection: DuplexConnection)
+ : DuplexConnection =
+ if (type === DuplexConnectionInterceptor.Type.SETUP) {
+ LeaseEnablingConnection(connection, leaseContext)
+ } else {
+ connection
+ }
+
+ private class LeaseEnablingConnection(
+ setupConnection: DuplexConnection,
+ private val leaseContext: LeaseContext)
+ : DuplexConnectionProxy(setupConnection) {
+
+ override fun receive(): Flowable {
+ return super.receive()
+ .doOnNext(
+ { f ->
+ val enabled = f.type == FrameType.SETUP
+ && Frame.Setup.leaseEnabled(f)
+ leaseContext.leaseEnabled = enabled
+ })
+ }
+ }
+}
+
+
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt
new file mode 100644
index 000000000..1b2f812f5
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.reactivex.Completable
+import java.nio.ByteBuffer
+
+interface LeaseGranter {
+
+ fun grantLease(requests: Int, ttl: Int,
+ metadata: ByteBuffer?): Completable
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt
new file mode 100644
index 000000000..7a9b7e6d8
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.netty.buffer.Unpooled
+import io.reactivex.Completable
+import io.reactivex.Flowable
+import io.rsocket.kotlin.DuplexConnection
+import io.rsocket.kotlin.Frame
+import io.rsocket.kotlin.FrameType
+import io.rsocket.kotlin.util.DuplexConnectionProxy
+import org.reactivestreams.Publisher
+import java.nio.ByteBuffer
+
+internal class LeaseGranterConnection(
+ private val leaseContext: LeaseContext,
+ source: DuplexConnection,
+ private val sendManager: LeaseManager,
+ private val receiveManager: LeaseManager)
+ : DuplexConnectionProxy(source), LeaseGranter {
+
+ override fun send(frame: Publisher): Completable {
+ return super.send(Flowable.fromPublisher(frame)
+ .doOnNext { f -> leaseGrantedTo(f, receiveManager) })
+ }
+
+ override fun receive(): Flowable {
+ return super.receive()
+ .doOnNext { f -> leaseGrantedTo(f, sendManager) }
+ }
+
+ private fun leaseGrantedTo(f: Frame, leaseManager: LeaseManager) {
+ if (isEnabled() && isLease(f)) {
+ val requests = Frame.Lease.numberOfRequests(f)
+ val ttl = Frame.Lease.ttl(f)
+ leaseManager.grantLease(requests, ttl)
+ }
+ }
+
+ override fun grantLease(requests: Int,
+ ttl: Int,
+ metadata: ByteBuffer?): Completable {
+
+ val byteBuf = if (metadata == null) Unpooled.EMPTY_BUFFER
+ else Unpooled.wrappedBuffer(metadata)
+
+ return when {
+ ttl <= 0 -> return Completable
+ .error(IllegalArgumentException(
+ "Time-to-live should be positive"))
+ requests <= 0 -> Completable
+ .error(IllegalArgumentException(
+ "Allowed requests should be positive"))
+ else -> send(Flowable.just(Frame.Lease.from(ttl, requests, byteBuf)))
+ }
+ }
+
+ private fun isLease(f: Frame): Boolean {
+ return f.type === FrameType.LEASE
+ }
+
+ private fun isEnabled() = leaseContext.leaseEnabled
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt
new file mode 100644
index 000000000..b538ece44
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.rsocket.kotlin.DuplexConnection
+import io.rsocket.kotlin.LeaseRef
+import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor
+
+internal class LeaseGranterInterceptor(
+ private val leaseContext: LeaseContext,
+ private val sender: LeaseManager,
+ private val receiver: LeaseManager,
+ private val leaseHandle: (LeaseRef) -> Unit)
+ : DuplexConnectionInterceptor {
+
+ override fun invoke(type: DuplexConnectionInterceptor.Type,
+ connection: DuplexConnection): DuplexConnection {
+ return if (type === DuplexConnectionInterceptor.Type.SERVICE) {
+ val leaseGranterConnection = LeaseGranterConnection(
+ leaseContext,
+ connection,
+ sender,
+ receiver)
+ val leaseConnectionRef = ConnectionLeaseRef(leaseGranterConnection)
+ leaseHandle(leaseConnectionRef)
+ leaseGranterConnection
+ } else {
+ connection
+ }
+ }
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt
new file mode 100644
index 000000000..9de07dfba
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt
@@ -0,0 +1,74 @@
+package io.rsocket.kotlin.internal.lease
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+internal class LeaseImpl(private val startingNumberOfRequests: Int,
+ override val ttl: Int,
+ override val metadata: ByteBuffer?) : Lease {
+ private val numberOfRequests: AtomicInteger
+ private val expiry: Long
+
+ init {
+ assertNumberOfRequests(startingNumberOfRequests, ttl)
+ this.numberOfRequests = AtomicInteger(startingNumberOfRequests)
+ this.expiry = now() + ttl
+ }
+
+ override val allowedRequests: Int
+ get() = Math.max(0, numberOfRequests.get())
+
+ override val isValid: Boolean
+ get() = startingNumberOfRequests > 0
+ && allowedRequests > 0
+ && !isExpired
+
+ override fun expiry(): Long {
+ return expiry
+ }
+
+ fun availability(): Double {
+ return if (isValid) allowedRequests /
+ startingNumberOfRequests.toDouble() else 0.0
+ }
+
+ fun use(useRequestCount: Int): Boolean {
+ assertUseRequests(useRequestCount)
+ return !isExpired && numberOfRequests.accAndGet(
+ useRequestCount,
+ { cur, update -> Math.max(-1, cur - update) }) >= 0
+ }
+
+ companion object {
+
+ private fun now() = System.currentTimeMillis()
+
+ private inline fun AtomicInteger.accAndGet(update: Int,
+ acc: (Int, Int) -> Int): Int {
+ var cur: Int
+ var next: Int
+ do {
+ cur = get()
+ next = acc.invoke(cur, update)
+ } while (!compareAndSet(cur, next))
+ return next
+ }
+
+ fun invalidLease(): LeaseImpl = LeaseImpl(0, 0, null)
+
+ fun assertUseRequests(useRequestCount: Int) {
+ if (useRequestCount <= 0) {
+ throw IllegalArgumentException("Number of requests must be positive")
+ }
+ }
+
+ private fun assertNumberOfRequests(numberOfRequests: Int, ttl: Int) {
+ if (numberOfRequests < 0) {
+ throw IllegalArgumentException("Number of requests must be non-negative")
+ }
+ if (ttl < 0) {
+ throw IllegalArgumentException("Time-to-live must be non-negative")
+ }
+ }
+ }
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt
new file mode 100644
index 000000000..b8ae2f4e9
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.rsocket.kotlin.RSocket
+import io.rsocket.kotlin.interceptors.RSocketInterceptor
+
+internal class LeaseInterceptor(private val leaseContext: LeaseContext,
+ private val leaseManager: LeaseManager,
+ private val tag: String)
+ : RSocketInterceptor {
+
+ override fun invoke(rSocket: RSocket): RSocket =
+ LeaseRSocket(
+ leaseContext,
+ rSocket,
+ tag,
+ leaseManager)
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt
new file mode 100644
index 000000000..14224d4c8
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.rsocket.kotlin.exceptions.MissingLeaseException
+
+/** Updates Lease on use and grant */
+internal class LeaseManager(private val tag: String) {
+ @Volatile
+ private var currentLease = INVALID_MUTABLE_LEASE
+
+ init {
+ requireNotNull(tag, { "tag" })
+ }
+
+ fun availability(): Double {
+ return currentLease.availability()
+ }
+
+ fun grantLease(numberOfRequests: Int, ttl: Int) {
+ assertGrantedLease(numberOfRequests, ttl)
+ this.currentLease = LeaseImpl(numberOfRequests, ttl, null)
+ }
+
+ fun useLease(): Result =
+ if (currentLease.use(1))
+ Success
+ else
+ Error(MissingLeaseException(currentLease, tag))
+
+ override fun toString(): String {
+ return "LeaseManager{tag='$tag'}"
+ }
+
+ companion object {
+ private val INVALID_MUTABLE_LEASE = LeaseImpl.invalidLease()
+
+ private fun assertGrantedLease(numberOfRequests: Int, ttl: Int) {
+ if (numberOfRequests <= 0) {
+ throw IllegalArgumentException("numberOfRequests must be positive")
+ }
+ if (ttl <= 0) {
+ throw IllegalArgumentException("time-to-live must be positive")
+ }
+ }
+ }
+}
+
+internal sealed class Result
+
+internal object Success : Result()
+
+internal data class Error(val ex: Throwable) : Result()
\ No newline at end of file
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt
new file mode 100644
index 000000000..7263cc229
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.reactivex.Completable
+import io.reactivex.Flowable
+import io.reactivex.Single
+import io.rsocket.kotlin.Payload
+import io.rsocket.kotlin.RSocket
+import io.rsocket.kotlin.util.RSocketProxy
+import org.reactivestreams.Publisher
+
+internal class LeaseRSocket(
+ private val leaseContext: LeaseContext,
+ source: RSocket, private
+ val tag: String,
+ private val leaseManager: LeaseManager) : RSocketProxy(source) {
+
+ override fun fireAndForget(payload: Payload): Completable {
+ return request(super.fireAndForget(payload))
+ }
+
+ override fun requestResponse(payload: Payload): Single {
+ return request(super.requestResponse(payload))
+ }
+
+ override fun requestStream(payload: Payload): Flowable {
+ return request(super.requestStream(payload))
+ }
+
+ override fun requestChannel(payloads: Publisher): Flowable {
+ return request(super.requestChannel(payloads))
+ }
+
+ override fun availability(): Double {
+ return Math.min(super.availability(), leaseManager.availability())
+ }
+
+ override fun toString(): String =
+ "LeaseRSocket(leaseContext=$leaseContext," +
+ " tag='$tag', " +
+ "leaseManager=$leaseManager)"
+
+ private fun request(actual: Completable): Completable =
+ request(
+ { Completable.defer(it) },
+ actual,
+ { Completable.error(it) })
+
+ private fun request(actual: Single): Single =
+ request(
+ { Single.defer(it) },
+ actual,
+ { Single.error(it) })
+
+ private fun request(actual: Flowable): Flowable =
+ request(
+ { Flowable.defer(it) },
+ actual,
+ { Flowable.error(it) })
+
+ private fun request(
+ defer: (() -> T) -> T,
+ actual: T,
+ error: (Throwable) -> T): T {
+
+ return defer {
+ if (isEnabled()) {
+ val result = leaseManager.useLease()
+ when (result) {
+ is Success -> actual
+ is Error -> error(result.ex)
+ }
+ } else {
+ actual
+ }
+ }
+ }
+
+ private fun isEnabled() = leaseContext.leaseEnabled
+}
diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt
new file mode 100644
index 000000000..f075af7d5
--- /dev/null
+++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 Maksym Ostroverkhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.kotlin.internal.lease
+
+import io.rsocket.kotlin.LeaseRef
+import io.rsocket.kotlin.internal.InterceptorRegistry
+
+internal sealed class LeaseSupport {
+
+ abstract fun enable(leaseHandle: (LeaseRef) -> Unit): () -> InterceptorRegistry
+}
+
+internal object ServerLeaseSupport : LeaseSupport() {
+
+ override fun enable(leaseHandle: (LeaseRef) -> Unit)
+ : () -> InterceptorRegistry = {
+
+ val sender = LeaseManager(serverRequester)
+ val receiver = LeaseManager(serverResponder)
+ val leaseContext = LeaseContext()
+ val registry = InterceptorRegistry()
+
+ /*requester RSocket is Lease aware*/
+ registry.requester(LeaseInterceptor(
+ leaseContext,
+ sender,
+ serverRequester))
+ /*handler RSocket is Lease aware*/
+ registry.handler(LeaseInterceptor(
+ leaseContext,
+ receiver,
+ serverResponder))
+ /*grants Lease quotas of above RSockets*/
+ registry.connection(LeaseGranterInterceptor(
+ leaseContext,
+ sender,
+ receiver,
+ leaseHandle))
+ /*enables lease for particular connection*/
+ registry.connection(LeaseEnablingInterceptor(leaseContext))
+ registry
+ }
+
+ private const val serverRequester = "server requester"
+ private const val serverResponder = "server responder"
+}
+
+internal object ClientLeaseSupport : LeaseSupport() {
+ private val leaseEnabled = LeaseContext()
+
+ override fun enable(leaseHandle: (LeaseRef) -> Unit)
+ : () -> InterceptorRegistry = {
+
+ val sender = LeaseManager(clientRequester)
+ val receiver = LeaseManager(clientResponder)
+ val registry = InterceptorRegistry()
+ /*requester RSocket is Lease aware*/
+ registry.requester(LeaseInterceptor(
+ leaseEnabled,
+ sender, clientRequester))
+ /*handler RSocket is Lease aware*/
+ registry.handler(LeaseInterceptor(
+ leaseEnabled,
+ receiver,
+ clientResponder))
+ /*grants Lease quotas to above RSockets*/
+ registry.connection(LeaseGranterInterceptor(
+ leaseEnabled,
+ sender,
+ receiver,
+ leaseHandle))
+ registry
+ }
+
+ private const val clientRequester = "client requester"
+ private const val clientResponder = "client responder"
+}
diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt
index b08f6c73b..956282a33 100644
--- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt
+++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt
@@ -35,15 +35,6 @@ class ServiceHandlerTest {
conn.close().subscribe()
}
- @Test
- fun serviceHandlerLease() {
- ServerServiceHandler(conn, keepAlive, errors)
- receiver.onNext(Frame.Lease.from(1000, 42, EMPTY_BUFFER))
- val errs = errors.get()
- assertEquals(1, errs.size)
- assertTrue(errs.first() is IllegalArgumentException)
- }
-
@Test
fun serviceHandlerError() {
ServerServiceHandler(conn, keepAlive, errors)
diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt
index eaccd470c..737752f46 100644
--- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt
+++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt
@@ -1,8 +1,8 @@
package io.rsocket.kotlin.frame
-import io.rsocket.kotlin.Frame
-import io.rsocket.kotlin.Setup
import io.rsocket.kotlin.DefaultPayload
+import io.rsocket.kotlin.Frame
+import io.rsocket.kotlin.internal.SetupContents
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -14,7 +14,7 @@ class SetupFrameTest {
"metadataMime",
"dataMime",
DefaultPayload.textPayload("data", "metadata"))
- val setup = Setup.create(setupFrame)
+ val setup = SetupContents.create(setupFrame)
assertEquals(setup.keepAliveInterval().millis, 100)
assertEquals(setup.keepAliveMaxLifeTime().millis, 1000)
assertEquals(setup.metadataMimeType(), "metadataMime")
diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt
index f69bf462e..7dd7562f3 100644
--- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt
+++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/fragmentation/StreamFramesReassemblerTest.kt
@@ -19,8 +19,7 @@ package io.rsocket.kotlin.internal.fragmentation
import io.rsocket.kotlin.Frame
import io.rsocket.kotlin.FrameType
import io.rsocket.kotlin.DefaultPayload
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
+import org.junit.Assert.*
import org.junit.Test
import java.nio.ByteBuffer
import java.util.concurrent.ThreadLocalRandom
@@ -34,6 +33,7 @@ class StreamFramesReassemblerTest {
val from = Frame.Request.from(
1024, FrameType.REQUEST_RESPONSE, DefaultPayload(data, metadata), 1)
+ .retain()
val frameFragmenter = FrameFragmenter(2)
val frameReassembler = StreamFramesReassembler(from)
frameFragmenter.fragment(from)
@@ -58,6 +58,24 @@ class StreamFramesReassemblerTest {
}
}
+ @Test
+ fun testReassembleNullMetadata() {
+ val data = createRandomBytes(16)
+ val metadata = null
+
+ val from = Frame.Request.from(
+ 1024, FrameType.REQUEST_RESPONSE, DefaultPayload(data, metadata), 1)
+ .retain()
+ val frameFragmenter = FrameFragmenter(2)
+ val frameReassembler = StreamFramesReassembler(from)
+ frameFragmenter.fragment(from)
+ .doOnNext { frameReassembler.append(it) }
+ .blockingLast()
+ val reassemble = frameReassembler.reassemble()
+ assertFalse(reassemble.hasMetadata())
+ assertFalse(reassemble.metadata.hasRemaining())
+ }
+
private fun createRandomBytes(size: Int): ByteBuffer {
val bytes = ByteArray(size)
ThreadLocalRandom.current().nextBytes(bytes)
diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt
new file mode 100644
index 000000000..160d49851
--- /dev/null
+++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt
@@ -0,0 +1,74 @@
+package io.rsocket.kotlin.internal.lease
+
+import io.netty.buffer.Unpooled
+import io.reactivex.Completable
+import io.reactivex.Flowable
+import io.reactivex.processors.PublishProcessor
+import io.rsocket.kotlin.Frame
+import io.rsocket.kotlin.FrameType
+import io.rsocket.kotlin.internal.lease.LeaseContext
+import io.rsocket.kotlin.internal.lease.LeaseGranterConnection
+import io.rsocket.kotlin.internal.lease.LeaseManager
+import io.rsocket.kotlin.test.util.LocalDuplexConnection
+import org.junit.Assert.*
+import org.junit.Before
+import org.junit.Test
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+class LeaseGranterConnectionTest {
+
+ private lateinit var leaseGranterConnection: LeaseGranterConnection
+ private lateinit var sender: PublishProcessor
+ private lateinit var receiver: PublishProcessor
+ private lateinit var send: LeaseManager
+ private lateinit var receive: LeaseManager
+
+ @Before
+ fun setUp() {
+ val leaseContext = LeaseContext()
+ send = LeaseManager("send")
+ receive = LeaseManager("receive")
+ sender = PublishProcessor.create()
+ receiver = PublishProcessor.create()
+ val local = LocalDuplexConnection("test", sender, receiver)
+ leaseGranterConnection = LeaseGranterConnection(
+ leaseContext,
+ local,
+ send,
+ receive)
+ }
+
+ @Test
+ fun sentLease() {
+ leaseGranterConnection.send(
+ Flowable.just(Frame.Lease.from(2, 1, Unpooled.EMPTY_BUFFER)))
+ .blockingAwait()
+ assertEquals(1.0, receive.availability(), 1e-5)
+ assertEquals(0.0, send.availability(), 1e-5)
+ }
+
+ @Test
+ fun receivedLease() {
+ leaseGranterConnection.receive().subscribe()
+ receiver.onNext(Frame.Lease.from(2, 1, Unpooled.EMPTY_BUFFER))
+ assertEquals(0.0, receive.availability(), 1e-5)
+ assertEquals(1.0, send.availability(), 1e-5)
+ }
+
+ @Test
+ fun grantLease() {
+ Completable.timer(100, TimeUnit.MILLISECONDS)
+ .andThen(Completable.defer {
+ leaseGranterConnection
+ .grantLease(2, 1, ByteBuffer.allocateDirect(0))
+ })
+ .subscribe()
+ val f = sender.firstOrError().blockingGet()
+
+ assertNotNull(f)
+ assertTrue(f.type === FrameType.LEASE)
+ assertEquals(2, Frame.Lease.numberOfRequests(f))
+ assertEquals(1, Frame.Lease.ttl(f))
+ }
+}
diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt
new file mode 100644
index 000000000..3f5581664
--- /dev/null
+++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt
@@ -0,0 +1,65 @@
+package io.rsocket.kotlin.internal.lease
+
+import io.reactivex.Completable
+import io.rsocket.kotlin.exceptions.MissingLeaseException
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.TimeUnit
+
+class LeaseManagerTest {
+
+ private lateinit var leaseManager: LeaseManager
+
+ @Before
+ fun setUp() {
+ leaseManager = LeaseManager("")
+ }
+
+ @Test
+ fun initialLeaseAvailability() {
+ assertEquals(0.0, leaseManager.availability(), 1e-5)
+ }
+
+ @Test
+ fun useNoRequests() {
+ val result = leaseManager.useLease()
+ assertTrue(result is Error)
+ result as Error
+ assertTrue(result.ex is MissingLeaseException)
+ }
+
+ @Test
+ fun grant() {
+ leaseManager.grantLease(2, 1_000)
+ assertEquals(1.0, leaseManager.availability(), 1e-5)
+ }
+
+ @Test(expected = IllegalArgumentException::class)
+ fun grantLeaseZeroRequests() {
+ leaseManager.grantLease(0, 1_000)
+ }
+
+ @Test(expected = IllegalArgumentException::class)
+ fun grantLeaseZeroTtl() {
+ leaseManager.grantLease(1, 0)
+ }
+
+ @Test
+ fun use() {
+ leaseManager.grantLease(2, 1_000)
+ leaseManager.useLease()
+ assertEquals(0.5, leaseManager.availability(), 1e-5)
+ }
+
+ @Test
+ fun useTimeout() {
+ leaseManager.grantLease(2, 1_000)
+ Completable.timer(1500, TimeUnit.MILLISECONDS).blockingAwait()
+ val result = leaseManager.useLease()
+ assertTrue(result is Error)
+ result as Error
+ assertTrue(result.ex is MissingLeaseException)
+ }
+}
diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt
new file mode 100644
index 000000000..250365eb2
--- /dev/null
+++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt
@@ -0,0 +1,88 @@
+package io.rsocket.kotlin.internal.lease
+
+import io.reactivex.Completable
+import io.reactivex.Flowable
+import io.reactivex.Single
+import io.rsocket.kotlin.DefaultPayload
+import io.rsocket.kotlin.Payload
+import io.rsocket.kotlin.RSocket
+import io.rsocket.kotlin.exceptions.MissingLeaseException
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+import org.reactivestreams.Publisher
+
+class LeaseRSocketTest {
+
+ private lateinit var leaseRSocket: LeaseRSocket
+ private lateinit var rSocket: MockRSocket
+ private lateinit var leaseManager: LeaseManager
+
+ @Before
+ fun setUp() {
+ rSocket = MockRSocket()
+ leaseManager = LeaseManager("")
+ val leaseEnabled = LeaseContext()
+ leaseRSocket = LeaseRSocket(leaseEnabled, rSocket, "", leaseManager)
+ }
+
+ @Test
+ fun grantedLease() {
+ leaseManager.grantLease(2, 1_000)
+ assertEquals(1.0, leaseRSocket.availability(), 1e-5)
+ }
+
+ @Test
+ fun usedLease() {
+ leaseManager.grantLease(2, 1_000)
+ leaseRSocket.fireAndForget(DefaultPayload("test")).subscribe()
+ assertEquals(0.5, leaseRSocket.availability(), 1e-5)
+ }
+
+ @Test
+ fun depletedLease() {
+ leaseManager.grantLease(1, 1_000)
+ val fireAndForget = leaseRSocket.fireAndForget(DefaultPayload("test"))
+ val firstErr = fireAndForget.blockingGet()
+ assertTrue(firstErr == null)
+ val secondErr = fireAndForget.blockingGet()
+ assertTrue(secondErr is MissingLeaseException)
+ }
+
+ @Test
+ fun connectionNotAvailable() {
+ leaseManager.grantLease(1, 1_000)
+ rSocket.setAvailability(0.0f)
+ assertEquals(0.0, leaseRSocket.availability(), 1e-5)
+ }
+
+ private class MockRSocket : RSocket {
+ private var availability = 1.0f
+
+ fun setAvailability(availability: Float) {
+ this.availability = availability
+ }
+
+ override fun availability(): Double = availability.toDouble()
+
+ override fun fireAndForget(payload: Payload): Completable =
+ Completable.complete()
+
+ override fun requestResponse(payload: Payload): Single =
+ Single.just(payload)
+
+ override fun requestStream(payload: Payload): Flowable =
+ Flowable.just(payload)
+
+ override fun requestChannel(payloads: Publisher): Flowable =
+ Flowable.fromPublisher(payloads)
+
+ override fun metadataPush(payload: Payload): Completable =
+ Completable.complete()
+
+ override fun close(): Completable = Completable.complete()
+
+ override fun onClose(): Completable = Completable.complete()
+ }
+}
\ No newline at end of file
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt
new file mode 100644
index 000000000..238149ed9
--- /dev/null
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt
@@ -0,0 +1,118 @@
+package io.rsocket.kotlin.test.lease
+
+import io.reactivex.Single
+import io.reactivex.processors.BehaviorProcessor
+import io.reactivex.subscribers.TestSubscriber
+import io.rsocket.kotlin.*
+import io.rsocket.kotlin.exceptions.MissingLeaseException
+import io.rsocket.kotlin.transport.netty.client.TcpClientTransport
+import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable
+import io.rsocket.kotlin.transport.netty.server.TcpServerTransport
+import io.rsocket.kotlin.util.AbstractRSocket
+import org.junit.After
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.TimeUnit
+
+class LeaseServerTest {
+ private lateinit var nettyContextCloseable: NettyContextCloseable
+ private lateinit var serverLease: LeaseRefs
+ private lateinit var clientSocket: RSocket
+ private lateinit var leaseRef: LeaseRef
+ @Before
+ fun setUp() {
+ serverLease = LeaseRefs()
+ nettyContextCloseable = RSocketFactory.receive()
+ .enableLease(serverLease)
+ .acceptor {
+ { _, _ ->
+ Single.just(
+ object : AbstractRSocket() {
+ override fun requestResponse(payload: Payload)
+ : Single =
+ Single.just(payload)
+ })
+ }
+ }
+ .transport(TcpServerTransport.create("localhost", 0))
+ .start()
+ .blockingGet()
+
+ val address = nettyContextCloseable.address()
+ clientSocket = RSocketFactory
+ .connect()
+ .enableLease(LeaseRefs())
+ .keepAlive { opts ->
+ opts.keepAliveInterval(Duration.ofMinutes(1))
+ .keepAliveMaxLifeTime(Duration.ofMinutes(20))
+ }
+ .transport(TcpClientTransport.create(address))
+ .start()
+ .blockingGet()
+
+ leaseRef = serverLease.leaseRef().blockingGet()
+ }
+
+ @After
+ fun tearDown() {
+ clientSocket.close().subscribe()
+ nettyContextCloseable.close().subscribe()
+ nettyContextCloseable.onClose().blockingAwait()
+ }
+
+ @Test
+ fun grantLeaseNumberOfRequests() {
+ assertEquals(clientSocket.availability(), 0.0, 1e-5)
+ leaseRef.grantLease(2, 10_000)
+ .delay(100, TimeUnit.MILLISECONDS)
+ .blockingAwait()
+ assertEquals(clientSocket.availability(), 1.0, 1e-5)
+ clientSocket.requestResponse(payload())
+ .blockingGet()
+ assertEquals(clientSocket.availability(), 0.5, 1e-5)
+ clientSocket.requestResponse(payload())
+ .blockingGet()
+ assertEquals(clientSocket.availability(), 0.0, 1e-5)
+
+ val subscriber = TestSubscriber()
+ clientSocket.requestResponse(payload())
+ .toFlowable()
+ .blockingSubscribe(subscriber)
+ assertEquals(1, subscriber.errorCount())
+ assertTrue(subscriber.errors().first() is MissingLeaseException)
+ leaseRef.grantLease(1, 10_000)
+ .delay(100, TimeUnit.MILLISECONDS)
+ .blockingAwait()
+ assertEquals(1.0, clientSocket.availability(), 1e-5)
+ }
+
+ @Test
+ fun grantLeaseTtl() {
+ leaseRef.grantLease(2, 200)
+ .delay(250, TimeUnit.MILLISECONDS)
+ .blockingAwait()
+
+ assertEquals(clientSocket.availability(), 0.0, 1e-5)
+ val subscriber = TestSubscriber()
+ clientSocket.requestResponse(payload()).toFlowable()
+ .blockingSubscribe(subscriber)
+
+ assertEquals(1, subscriber.errorCount())
+ assertTrue(subscriber.errors().first() is MissingLeaseException)
+ }
+
+ private fun payload() = DefaultPayload("data")
+
+ private class LeaseRefs : (LeaseRef) -> Unit {
+ private val leaseRefs = BehaviorProcessor.create()
+
+ fun leaseRef(): Single = leaseRefs.firstOrError()
+
+ override fun invoke(leaseRef: LeaseRef) {
+ leaseRefs.onNext(leaseRef)
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt
new file mode 100644
index 000000000..74e650e0d
--- /dev/null
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt
@@ -0,0 +1,86 @@
+package io.rsocket.kotlin.test.lease.example
+
+import io.reactivex.Flowable
+import io.reactivex.Single
+import io.reactivex.processors.BehaviorProcessor
+import io.reactivex.schedulers.Schedulers
+import io.rsocket.kotlin.*
+import io.rsocket.kotlin.transport.netty.client.TcpClientTransport
+import io.rsocket.kotlin.transport.netty.server.TcpServerTransport
+import io.rsocket.kotlin.util.AbstractRSocket
+import org.slf4j.LoggerFactory
+import java.util.*
+import java.util.concurrent.TimeUnit
+
+object LeaseClientServerExample {
+ private val LOGGER = LoggerFactory.getLogger(LeaseClientServerExample::class.java)
+
+ @JvmStatic
+ fun main(args: Array) {
+
+ val serverLease = LeaseRefs()
+ val nettyContextCloseable = RSocketFactory.receive()
+ .enableLease(serverLease)
+ .acceptor {
+ { _, _ ->
+ Single.just(
+ object : AbstractRSocket() {
+ override fun requestResponse(payload: Payload)
+ : Single =
+ Single.just(DefaultPayload("Server Response ${Date()}"))
+ })
+ }
+ }
+ .transport(TcpServerTransport.create("localhost", 0))
+ .start()
+ .blockingGet()
+
+ val address = nettyContextCloseable.address()
+ val clientSocket = RSocketFactory.connect()
+ .enableLease(LeaseRefs())
+ .keepAlive { opts ->
+ opts.keepAliveInterval(Duration.ofMinutes(1))
+ .keepAliveMaxLifeTime(Duration.ofMinutes(20))
+ }
+ .transport(TcpClientTransport.create(address))
+ .start()
+ .blockingGet()
+
+ Flowable.interval(1, TimeUnit.SECONDS)
+ .observeOn(Schedulers.io())
+ .flatMap {
+ LOGGER.info("Availability: ${clientSocket.availability()}")
+ clientSocket
+ .requestResponse(DefaultPayload("Client request ${Date()}"))
+ .toFlowable()
+ .doOnError { LOGGER.info("Error: $it") }
+ .onErrorResumeNext { _: Throwable ->
+ Flowable.empty()
+ }
+ }
+ .subscribe { resp -> LOGGER.info("Client response: ${resp.dataUtf8}") }
+
+ serverLease
+ .leaseRef()
+ .flatMapCompletable { connRef ->
+ Flowable.interval(1, 10, TimeUnit.SECONDS)
+ .flatMapCompletable { _ ->
+ connRef.grantLease(
+ numberOfRequests = 7,
+ timeToLiveMillis = 5_000)
+ }
+ }.subscribe()
+
+ clientSocket.onClose().blockingAwait()
+ }
+
+ private class LeaseRefs : (LeaseRef) -> Unit {
+ private val leaseRefs = BehaviorProcessor.create()
+
+ fun leaseRef(): Single = leaseRefs.firstOrError()
+
+ override fun invoke(leaseRef: LeaseRef) {
+ leaseRefs.onNext(leaseRef)
+ }
+ }
+}
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/ClientServerChannelTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt
similarity index 98%
rename from test/src/test/kotlin/io/rsocket/kotlin/test/ClientServerChannelTest.kt
rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt
index 52fcf9e23..c79482b8a 100644
--- a/test/src/test/kotlin/io/rsocket/kotlin/test/ClientServerChannelTest.kt
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt
@@ -1,15 +1,15 @@
-package io.rsocket.kotlin.test
+package io.rsocket.kotlin.test.transport
import io.reactivex.Flowable
import io.reactivex.Single
-import io.rsocket.kotlin.util.AbstractRSocket
+import io.rsocket.kotlin.DefaultPayload
import io.rsocket.kotlin.Payload
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.RSocketFactory
import io.rsocket.kotlin.transport.netty.client.TcpClientTransport
import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable
import io.rsocket.kotlin.transport.netty.server.TcpServerTransport
-import io.rsocket.kotlin.DefaultPayload
+import io.rsocket.kotlin.util.AbstractRSocket
import org.junit.Before
import org.junit.Test
import org.reactivestreams.Publisher
@@ -26,7 +26,6 @@ class ClientServerChannelTest {
val address = InetSocketAddress
.createUnresolved("localhost", 0)
val serverTransport = TcpServerTransport.create(address)
-
channelHandler = ChannelHandler(intervalMillis)
server = RSocketFactory
.receive()
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/EndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt
similarity index 99%
rename from test/src/test/kotlin/io/rsocket/kotlin/test/EndToEndTest.kt
rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt
index 292fa4c2b..7489152f7 100644
--- a/test/src/test/kotlin/io/rsocket/kotlin/test/EndToEndTest.kt
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt
@@ -1,4 +1,4 @@
-package io.rsocket.kotlin.test
+package io.rsocket.kotlin.test.transport
import io.reactivex.Completable
import io.reactivex.Flowable
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyTcpEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt
similarity index 88%
rename from test/src/test/kotlin/io/rsocket/kotlin/test/NettyTcpEndToEndTest.kt
rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt
index a9834b372..31632083f 100644
--- a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyTcpEndToEndTest.kt
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt
@@ -1,4 +1,4 @@
-package io.rsocket.kotlin.test
+package io.rsocket.kotlin.test.transport
import io.rsocket.kotlin.transport.netty.client.TcpClientTransport
import io.rsocket.kotlin.transport.netty.server.TcpServerTransport
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyWebsocketEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt
similarity index 83%
rename from test/src/test/kotlin/io/rsocket/kotlin/test/NettyWebsocketEndToEndTest.kt
rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt
index eb1e1f2d8..cf9f1af98 100644
--- a/test/src/test/kotlin/io/rsocket/kotlin/test/NettyWebsocketEndToEndTest.kt
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt
@@ -1,5 +1,6 @@
-package io.rsocket.kotlin.test
+package io.rsocket.kotlin.test.transport
+import io.rsocket.kotlin.test.transport.EndToEndTest
import io.rsocket.kotlin.transport.netty.client.WebsocketClientTransport
import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport
diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/OkHttpNettyEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt
similarity index 85%
rename from test/src/test/kotlin/io/rsocket/kotlin/test/OkHttpNettyEndToEndTest.kt
rename to test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt
index 33dd9c8db..c1817e12a 100644
--- a/test/src/test/kotlin/io/rsocket/kotlin/test/OkHttpNettyEndToEndTest.kt
+++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt
@@ -1,5 +1,6 @@
-package io.rsocket.kotlin.test
+package io.rsocket.kotlin.test.transport
+import io.rsocket.kotlin.test.transport.EndToEndTest
import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport
import io.rsocket.transport.okhttp.client.OkhttpWebsocketClientTransport
import okhttp3.HttpUrl