diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseGranter.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseGranter.kt deleted file mode 100644 index c771b23a9..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseGranter.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin - -import io.reactivex.Completable - -/** - * Grants Lease to its peer - */ -interface LeaseGranter { - /** - * Grants lease to its peer - * - * @param numberOfRequests number of requests peer is allowed to perform. - * Must be positive - * @param ttlSeconds number of seconds that this lease is valid from the time - * it is received. - * @return [Completable] which signals error if underlying RSocket is closed, - * completes successfully otherwise - */ - fun grant(numberOfRequests: Int, - ttlSeconds: Int): Completable - - /** - * @return [Completable] which completes when RSocket is closed - */ - fun onClose(): Completable -} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseOptions.kt deleted file mode 100644 index 31f10ea48..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseOptions.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin - -/** - * Configures lease options of client and server RSocket - */ -class LeaseOptions { - - private var rSocketLeaseConsumer: ((RSocketLease) -> Unit)? = null - - /** - * Enables lease feature of RSocket - - * @param rSocketLeaseConsumer consumer of [RSocketLease] for each established RSocket - * @return this [LeaseOptions] - */ - fun enableLease(rSocketLeaseConsumer: (RSocketLease) -> Unit): LeaseOptions { - this.rSocketLeaseConsumer = rSocketLeaseConsumer - return this - } - - internal fun rSocketLeaseConsumer(): ((RSocketLease) -> Unit)? = rSocketLeaseConsumer - - internal fun copy(): LeaseOptions { - val opts = LeaseOptions() - rSocketLeaseConsumer?.let { - opts.enableLease(it) - } - return opts - } -} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt index d7b510676..37ce7c03d 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt @@ -21,8 +21,6 @@ import io.reactivex.Single import io.rsocket.kotlin.interceptors.GlobalInterceptors import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.internal.fragmentation.FragmentationInterceptor -import io.rsocket.kotlin.internal.lease.ClientLeaseFeature -import io.rsocket.kotlin.internal.lease.ServerLeaseFeature import io.rsocket.kotlin.transport.ClientTransport import io.rsocket.kotlin.transport.ServerTransport import io.rsocket.kotlin.util.AbstractRSocket @@ -46,7 +44,6 @@ object RSocketFactory { private var acceptor: ClientAcceptor = { { emptyRSocket } } private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() } private var mtu = 0 - private val leaseOptions = LeaseOptions() private val interceptors = GlobalInterceptors.create() private var flags = 0 private var setupPayload: Payload = DefaultPayload.EMPTY @@ -101,20 +98,6 @@ object RSocketFactory { return this } - /** - * Configure lease support - * - * @param configure function which configures [LeaseOptions] - * @return this ClientRSocketFactory - */ - fun lease(configure: (LeaseOptions) -> Unit): ClientRSocketFactory { - configure(leaseOptions) - if (leaseOptions.rSocketLeaseConsumer() != null) { - this.flags = Frame.Setup.enableLease(flags) - } - return this - } - /** * Sets consumer for errors which are out of scope of RSocket interaction * requests (e.g. request-response, request-stream and so on) @@ -191,7 +174,6 @@ object RSocketFactory { mtu, flags, setupPayload, - leaseOptions.copy(), keepAlive.copy(), mediaType.copy(), options.copy(), @@ -204,7 +186,6 @@ object RSocketFactory { private var mtu: Int, private val flags: Int, private val setupPayload: Payload, - private val leaseOptions: LeaseOptions, keepAliveOpts: KeepAliveOptions, private val mediaType: MediaType, options: RSocketOptions, @@ -220,11 +201,8 @@ object RSocketFactory { return transportClient() .connect() .flatMap { connection -> - - val enabledLease = - enableLease(parentInterceptors) val enabledFragmentation = - enableFragmentation(enabledLease) + enableFragmentation(parentInterceptors) val interceptConnection = enabledFragmentation as InterceptConnection val interceptRSocket = @@ -284,18 +262,6 @@ object RSocketFactory { mediaType.dataMimeType(), setupPayload) } - - private fun enableLease(parentInterceptors: InterceptorRegistry) - : InterceptorRegistry { - val rSocketLeaseConsumer = - leaseOptions.rSocketLeaseConsumer() - return if (rSocketLeaseConsumer != null) { - parentInterceptors.copyWith( - ClientLeaseFeature.enable(rSocketLeaseConsumer)()) - } else { - parentInterceptors.copy() - } - } } } @@ -303,7 +269,6 @@ object RSocketFactory { private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() } private var mtu = 0 - private val leaseOptions = LeaseOptions() private val interceptors = GlobalInterceptors.create() private val options = RSocketOptions() @@ -331,17 +296,6 @@ object RSocketFactory { return this } - /** - * Configure lease support - * - * @param configure function which configures [LeaseOptions] - * @return this ServerRSocketFactory - */ - fun lease(configure: (LeaseOptions) -> Unit): ServerRSocketFactory { - configure(leaseOptions) - return this - } - /** * Sets consumer for errors which are out of scope of RSocket interaction * requests (e.g. request-response, request-stream and so on) @@ -382,7 +336,6 @@ object RSocketFactory { acceptor, errorConsumer, mtu, - leaseOptions.copy(), interceptors.copy(), options.copy()) } @@ -393,7 +346,6 @@ object RSocketFactory { private val acceptor: ServerAcceptor, private val errorConsumer: (Throwable) -> Unit, private val mtu: Int, - private val leaseOptions: LeaseOptions, private val parentInterceptors: InterceptorRegistry, options: RSocketOptions) : Start { @@ -406,10 +358,8 @@ object RSocketFactory { override fun invoke(duplexConnection: DuplexConnection) : Completable { - val enabledLease = - enableLease(parentInterceptors) val enabledServerContract = - enableServerContract(enabledLease) + enableServerContract(parentInterceptors) val enabledFragmentation = enableFragmentation(enabledServerContract) val interceptConnection = @@ -472,24 +422,11 @@ object RSocketFactory { .ignoreElement() } - private fun enableLease(parentInterceptors: InterceptorRegistry) - : InterceptorRegistry { - val rSocketLeaseConsumer = - leaseOptions.rSocketLeaseConsumer() - return if (rSocketLeaseConsumer != null) { - parentInterceptors.copyWith( - ServerLeaseFeature.enable(rSocketLeaseConsumer)()) - } else { - parentInterceptors.copy() - } - } - private fun enableServerContract(parentInterceptors: InterceptorRegistry) : InterceptorRegistry { parentInterceptors.connectionFirst( - ServerContractInterceptor(errorConsumer, - leaseOptions.rSocketLeaseConsumer() != null)) + ServerContractInterceptor(errorConsumer, false)) return parentInterceptors } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketLease.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketLease.kt deleted file mode 100644 index d9f3468ce..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketLease.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin - -import io.rsocket.kotlin.internal.lease.LeaseConnection - -/** - * Provides means to grant leases to peer - */ -class RSocketLease internal constructor(private val leaseConnection - : LeaseConnection) { - /** - * @return [LeaseGranter] which is used to grant leases to peer - */ - fun granter(): LeaseGranter = leaseConnection -} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt deleted file mode 100644 index 9fc041d31..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/exceptions/MissingLeaseException.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.exceptions - -import io.rsocket.kotlin.internal.lease.Lease - -class MissingLeaseException internal constructor(lease: Lease, tag: String) - : RejectedException(leaseMessage(lease, tag)) { - - override fun fillInStackTrace(): Throwable = this - - companion object { - internal fun leaseMessage(lease: Lease, tag: String): String { - val expired = lease.isExpired - val allowedRequests = lease.allowedRequests - val initialAllowedRequests = lease.initialAllowedRequests - return "$tag: Missing lease. " + - "Expired: $expired, allowedRequests:" + - " $allowedRequests/$initialAllowedRequests" - } - } -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt deleted file mode 100644 index a82069131..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/Lease.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -/** A contract for RSocket lease, which is time bound. */ -internal interface Lease { - - /** - * @return The number of requests allowed by this lease. - */ - val allowedRequests: Int - - /** - * @return Initial number of requests allowed by this lease. - */ - val initialAllowedRequests: Int - - /** - * @return Number of seconds that this lease is valid from the time - * it is received. - */ - val timeToLiveSeconds: Int - - /** - * @return `true` if the lease has expired, false otherwise. - */ - val isExpired: Boolean - get() = isExpired(System.currentTimeMillis()) - - /** - * @return true if lease has not expired, and there are allowed requests - * available, false otherwise - */ - val isValid: Boolean - - /** - * @return Absolute time since epoch at which this lease will expire. - */ - val expiry: Long - - /** - * @param now current time in millis. - * @return `true` if the lease has expired for given `now`, false otherwise. - */ - fun isExpired(now: Long): Boolean = now >= expiry -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseConnection.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseConnection.kt deleted file mode 100644 index 59cce142c..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseConnection.kt +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.netty.buffer.Unpooled -import io.reactivex.Completable -import io.reactivex.Flowable -import io.rsocket.kotlin.* -import io.rsocket.kotlin.util.DuplexConnectionProxy -import org.reactivestreams.Publisher -import java.nio.channels.ClosedChannelException -import java.util.concurrent.atomic.AtomicReference - -internal class LeaseConnection( - private val leaseContext: LeaseContext, - source: DuplexConnection, - private val sendManager: LeaseManager, - private val receiveManager: LeaseManager) - : DuplexConnectionProxy(source), LeaseGranter { - - @Volatile - private var closed = false - - override fun grant(numberOfRequests: Int, - ttlSeconds: Int): Completable = - sendLease(numberOfRequests, ttlSeconds) - - override fun send(frame: Publisher): Completable { - return super.send(Flowable.fromPublisher(frame) - .doOnNext { f -> leaseGrantedTo(f, receiveManager) }) - } - - override fun receive(): Flowable { - return super.receive() - .doOnNext { f -> leaseGrantedTo(f, sendManager) } - } - - override fun onClose(): Completable { - return super.onClose() - .doOnComplete { closed = true } - } - - private fun leaseGrantedTo(f: Frame, - leaseManager: LeaseManager) { - if (isEnabled() && isLease(f)) { - val requests = Frame.Lease.numberOfRequests(f) - val ttl = Frame.Lease.ttl(f) - leaseManager.grant(requests, ttl) - } - } - - private fun sendLease(requests: Int, - ttl: Int): Completable = when { - ttl <= 0 -> Completable - .error(IllegalArgumentException( - "Time-to-live should be positive")) - requests <= 0 -> Completable - .error(IllegalArgumentException( - "Allowed requests should be positive")) - else -> - if (closed) { - Completable.error(ClosedChannelException()) - } else { - send(Flowable.just(Frame.Lease.from(ttl, requests, Unpooled.EMPTY_BUFFER))) - } - } - - private fun isLease(f: Frame) = f.type === FrameType.LEASE - - private fun isEnabled() = leaseContext.leaseEnabled -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt deleted file mode 100644 index 28a714265..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -/** State shared by Lease related interceptors */ -internal class LeaseContext(var leaseEnabled: Boolean = true) { - - override fun toString(): String = - "LeaseContext{leaseEnabled=$leaseEnabled}" -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt deleted file mode 100644 index a775d1f77..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.reactivex.Flowable -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.Frame -import io.rsocket.kotlin.FrameType -import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor -import io.rsocket.kotlin.util.DuplexConnectionProxy - -internal class LeaseEnablingInterceptor(private val leaseContext: LeaseContext) - : DuplexConnectionInterceptor { - - override fun invoke(type: DuplexConnectionInterceptor.Type, - connection: DuplexConnection) - : DuplexConnection = - if (type === DuplexConnectionInterceptor.Type.SETUP) { - LeaseEnablingConnection(connection, leaseContext) - } else { - connection - } - - private class LeaseEnablingConnection( - setupConnection: DuplexConnection, - private val leaseContext: LeaseContext) - : DuplexConnectionProxy(setupConnection) { - - override fun receive(): Flowable { - return super.receive() - .doOnNext( - { f -> - val enabled = f.type == FrameType.SETUP - && Frame.Setup.leaseEnabled(f) - leaseContext.leaseEnabled = enabled - }) - } - } -} - - diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseFeature.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseFeature.kt deleted file mode 100644 index 980e6db3b..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseFeature.kt +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.rsocket.kotlin.RSocketLease -import io.rsocket.kotlin.internal.InterceptorRegistry - -internal sealed class LeaseFeature { - - abstract fun enable(leaseSupport: (RSocketLease) -> Unit): () -> InterceptorRegistry -} - -internal object ServerLeaseFeature : LeaseFeature() { - - override fun enable(leaseSupport: (RSocketLease) -> Unit) - : () -> InterceptorRegistry = { - - val sender = LeaseManager(serverRequester) - val receiver = LeaseManager(serverResponder) - val leaseContext = LeaseContext() - val registry = InterceptorRegistry() - - /*requester RSocket is Lease aware*/ - registry.requester(LeaseInterceptor( - leaseContext, - sender, - serverRequester)) - /*handler RSocket is Lease aware*/ - registry.handler(LeaseInterceptor( - leaseContext, - receiver, - serverResponder)) - /*grants Lease quotas of above RSockets*/ - registry.connection(LeaseGranterInterceptor( - leaseContext, - sender, - receiver, - leaseSupport)) - /*enables lease for particular connection*/ - registry.connection(LeaseEnablingInterceptor(leaseContext)) - registry - } - - private const val serverRequester = "server requester" - private const val serverResponder = "server responder" -} - -internal object ClientLeaseFeature : LeaseFeature() { - private val leaseEnabled = LeaseContext() - - override fun enable(leaseSupport: (RSocketLease) -> Unit) - : () -> InterceptorRegistry = { - - val sender = LeaseManager(clientRequester) - val receiver = LeaseManager(clientResponder) - val registry = InterceptorRegistry() - /*requester RSocket is Lease aware*/ - registry.requester(LeaseInterceptor( - leaseEnabled, - sender, clientRequester)) - /*handler RSocket is Lease aware*/ - registry.handler(LeaseInterceptor( - leaseEnabled, - receiver, - clientResponder)) - /*grants Lease quotas to above RSockets*/ - registry.connection(LeaseGranterInterceptor( - leaseEnabled, - sender, - receiver, - leaseSupport)) - registry - } - - private const val clientRequester = "client requester" - private const val clientResponder = "client responder" -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt deleted file mode 100644 index dd368ee6c..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.RSocketLease -import io.rsocket.kotlin.interceptors.DuplexConnectionInterceptor - -internal class LeaseGranterInterceptor( - private val leaseContext: LeaseContext, - private val sender: LeaseManager, - private val receiver: LeaseManager, - private val rSocketLeaseConsumer: (RSocketLease) -> Unit) - : DuplexConnectionInterceptor { - - override fun invoke(type: DuplexConnectionInterceptor.Type, - connection: DuplexConnection): DuplexConnection { - return if (type === DuplexConnectionInterceptor.Type.SERVICE) { - val leaseConnection = LeaseConnection( - leaseContext, - connection, - sender, - receiver) - val rSocketLease = RSocketLease(leaseConnection) - rSocketLeaseConsumer(rSocketLease) - leaseConnection - } else { - connection - } - } -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt deleted file mode 100644 index 0b15efed2..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseImpl.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import java.util.concurrent.atomic.AtomicInteger - -internal class LeaseImpl(override val initialAllowedRequests: Int, - override val timeToLiveSeconds: Int) : Lease { - private val allowedReqs: AtomicInteger - override val expiry: Long - - init { - assertNumberOfRequests(initialAllowedRequests, timeToLiveSeconds) - this.allowedReqs = AtomicInteger(initialAllowedRequests) - this.expiry = now() + timeToLiveSeconds - } - - override val allowedRequests: Int - get() = Math.max(0, allowedReqs.get()) - - override val isValid: Boolean - get() = initialAllowedRequests > 0 - && allowedRequests > 0 - && !isExpired - - fun availability(): Double { - return if (isValid) allowedRequests / - initialAllowedRequests.toDouble() else 0.0 - } - - fun use(useRequestCount: Int): Boolean { - assertUseRequests(useRequestCount) - return !isExpired && allowedReqs.accAndGet( - useRequestCount, - { cur, update -> Math.max(-1, cur - update) }) >= 0 - } - - companion object { - - private fun now() = System.currentTimeMillis() - - private inline fun AtomicInteger.accAndGet(update: Int, - acc: (Int, Int) -> Int): Int { - var cur: Int - var next: Int - do { - cur = get() - next = acc.invoke(cur, update) - } while (!compareAndSet(cur, next)) - return next - } - - fun expired(): LeaseImpl = LeaseImpl(0, 0) - - fun assertUseRequests(useRequestCount: Int) { - if (useRequestCount <= 0) { - throw IllegalArgumentException("Number of requests must be positive") - } - } - - private fun assertNumberOfRequests(numberOfRequests: Int, ttl: Int) { - if (numberOfRequests < 0) { - throw IllegalArgumentException("Number of requests must be non-negative") - } - if (ttl < 0) { - throw IllegalArgumentException("Time-to-live must be non-negative") - } - } - } -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt deleted file mode 100644 index c5153d49d..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.rsocket.kotlin.RSocket -import io.rsocket.kotlin.interceptors.RSocketInterceptor - -internal class LeaseInterceptor(private val leaseContext: LeaseContext, - private val leaseManager: LeaseManager, - private val tag: String) - : RSocketInterceptor { - - override fun invoke(rSocket: RSocket): RSocket = - LeaseRSocket( - leaseContext, - rSocket, - tag, - leaseManager) -} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt deleted file mode 100644 index e35d15412..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.rsocket.kotlin.exceptions.MissingLeaseException - -/** - * Updates Lease on use and grant - */ -internal class LeaseManager(private val tag: String) { - @Volatile - private var currentLease = expiredLease - - fun availability(): Double = currentLease.availability() - - fun grant(numberOfRequests: Int, ttl: Int): Lease { - assertGrantedLease(numberOfRequests, ttl) - currentLease = LeaseImpl(numberOfRequests, ttl) - return currentLease - } - - fun use(): Result = - if (currentLease.use(1)) - Success - else - Error(MissingLeaseException(currentLease, tag)) - - override fun toString(): String { - return "LeaseManager{tag='$tag'}" - } - - companion object { - private val expiredLease = LeaseImpl.expired() - - private fun assertGrantedLease(numberOfRequests: Int, ttl: Int) { - if (numberOfRequests <= 0) { - throw IllegalArgumentException("numberOfRequests must be positive") - } - if (ttl <= 0) { - throw IllegalArgumentException("time-to-live must be positive") - } - } - } -} - -internal sealed class Result - -internal object Success : Result() - -internal data class Error(val ex: Throwable) : Result() \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt deleted file mode 100644 index 7623fd551..000000000 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.Single -import io.rsocket.kotlin.Payload -import io.rsocket.kotlin.RSocket -import io.rsocket.kotlin.util.RSocketProxy -import org.reactivestreams.Publisher - -internal class LeaseRSocket( - private val leaseContext: LeaseContext, - source: RSocket, - private val tag: String, - private val leaseManager: LeaseManager) : RSocketProxy(source) { - - override fun fireAndForget(payload: Payload): Completable = - request(super.fireAndForget(payload)) - - override fun requestResponse(payload: Payload): Single = - request(super.requestResponse(payload)) - - override fun requestStream(payload: Payload): Flowable = - request(super.requestStream(payload)) - - override fun requestChannel(payloads: Publisher): Flowable = - request(super.requestChannel(payloads)) - - override fun availability(): Double = - Math.min(super.availability(), leaseManager.availability()) - - override fun toString(): String = - "LeaseRSocket(leaseContext=$leaseContext," + - " tag='$tag', " + - "leaseManager=$leaseManager)" - - private fun request(actual: Completable): Completable = - request( - { Completable.defer(it) }, - actual, - { Completable.error(it) }) - - private fun request(actual: Single): Single = - request( - { Single.defer(it) }, - actual, - { Single.error(it) }) - - private fun request(actual: Flowable): Flowable = - request( - { Flowable.defer(it) }, - actual, - { Flowable.error(it) }) - - private fun request( - defer: (() -> T) -> T, - actual: T, - error: (Throwable) -> T): T = - if (isEnabled()) defer { - val result = leaseManager.use() - when (result) { - is Success -> actual - is Error -> error(result.ex) - } - } else actual - - private fun isEnabled() = leaseContext.leaseEnabled -} diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseConnectionTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseConnectionTest.kt deleted file mode 100644 index bd1132f96..000000000 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseConnectionTest.kt +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.netty.buffer.Unpooled -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.processors.PublishProcessor -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.Frame -import io.rsocket.kotlin.FrameType -import io.rsocket.kotlin.test.util.LocalDuplexConnection -import org.junit.Assert.* -import org.junit.Before -import org.junit.Test -import java.nio.ByteBuffer -import java.nio.channels.ClosedChannelException -import java.util.concurrent.TimeUnit - -class LeaseConnectionTest { - - private lateinit var leaseGranterConnection: LeaseConnection - private lateinit var sender: PublishProcessor - private lateinit var receiver: PublishProcessor - private lateinit var send: LeaseManager - private lateinit var receive: LeaseManager - private lateinit var conn: DuplexConnection - - @Before - fun setUp() { - val leaseContext = LeaseContext() - send = LeaseManager("send") - receive = LeaseManager("receive") - sender = PublishProcessor.create() - receiver = PublishProcessor.create() - conn = LocalDuplexConnection("test", sender, receiver) - leaseGranterConnection = LeaseConnection( - leaseContext, - conn, - send, - receive) - } - - @Test - fun sentLease() { - leaseGranterConnection.send( - Flowable.just(Frame.Lease.from(2_000, 1, Unpooled.EMPTY_BUFFER))) - .blockingAwait() - assertEquals(1.0, receive.availability(), 1e-5) - assertEquals(0.0, send.availability(), 1e-5) - } - - @Test - fun receivedLease() { - leaseGranterConnection.receive().subscribe() - receiver.onNext(Frame.Lease.from(2, 1, Unpooled.EMPTY_BUFFER)) - assertEquals(0.0, receive.availability(), 1e-5) - assertEquals(1.0, send.availability(), 1e-5) - } - - @Test - fun grantLease() { - Completable.timer(100, TimeUnit.MILLISECONDS) - .andThen(Completable.defer { - leaseGranterConnection - .grant(2, 1) - }) - .subscribe() - val f = sender.firstOrError().blockingGet() - - assertNotNull(f) - assertTrue(f.type === FrameType.LEASE) - assertEquals(2, Frame.Lease.numberOfRequests(f)) - assertEquals(1, Frame.Lease.ttl(f)) - } - - @Test - fun grantLeaseAfterClose() { - leaseGranterConnection.onClose().subscribe() - conn.close().blockingAwait(5, TimeUnit.SECONDS) - leaseGranterConnection.grant(2,1) - .test() - .assertError(ClosedChannelException::class.java) - } -} diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt deleted file mode 100644 index 82eb1aa80..000000000 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseManagerTest.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.reactivex.Completable -import io.rsocket.kotlin.exceptions.MissingLeaseException -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue -import org.junit.Before -import org.junit.Test -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit - -class LeaseManagerTest { - - private lateinit var leaseManager: LeaseManager - - @Before - fun setUp() { - leaseManager = LeaseManager("") - } - - @Test - fun initialLeaseAvailability() { - assertEquals(0.0, leaseManager.availability(), 1e-5) - } - - @Test - fun useNoRequests() { - val result = leaseManager.use() - assertTrue(result is Error) - result as Error - assertTrue(result.ex is MissingLeaseException) - } - - @Test - fun grant() { - leaseManager.grant(2, 1_000) - assertEquals(1.0, leaseManager.availability(), 1e-5) - } - - @Test(expected = IllegalArgumentException::class) - fun grantLeaseZeroRequests() { - leaseManager.grant(0, 1_000) - } - - @Test(expected = IllegalArgumentException::class) - fun grantLeaseZeroTtl() { - leaseManager.grant(1, 0) - } - - @Test - fun use() { - leaseManager.grant(2, 1_000) - leaseManager.use() - assertEquals(0.5, leaseManager.availability(), 1e-5) - } - - @Test - fun useTimeout() { - leaseManager.grant(2, 1_000) - Completable.timer(1500, TimeUnit.MILLISECONDS).blockingAwait() - val result = leaseManager.use() - assertTrue(result is Error) - result as Error - assertTrue(result.ex is MissingLeaseException) - } - - companion object { - private val EMPTY_DATA = ByteBuffer.allocateDirect(0) - } -} diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt deleted file mode 100644 index d530cd292..000000000 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocketTest.kt +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.lease - -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.Single -import io.rsocket.kotlin.DefaultPayload -import io.rsocket.kotlin.Payload -import io.rsocket.kotlin.RSocket -import io.rsocket.kotlin.exceptions.MissingLeaseException -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue -import org.junit.Before -import org.junit.Test -import org.reactivestreams.Publisher -import java.nio.ByteBuffer - -class LeaseRSocketTest { - - private lateinit var leaseRSocket: LeaseRSocket - private lateinit var rSocket: MockRSocket - private lateinit var leaseManager: LeaseManager - - @Before - fun setUp() { - rSocket = MockRSocket() - leaseManager = LeaseManager("") - val leaseEnabled = LeaseContext() - leaseRSocket = LeaseRSocket(leaseEnabled, rSocket, "", leaseManager) - } - - @Test - fun grantedLease() { - leaseManager.grant(2, 1_000) - assertEquals(1.0, leaseRSocket.availability(), 1e-5) - } - - @Test - fun usedLease() { - leaseManager.grant(2, 1_000) - leaseRSocket.fireAndForget(DefaultPayload("test")).subscribe() - assertEquals(0.5, leaseRSocket.availability(), 1e-5) - } - - @Test - fun depletedLease() { - leaseManager.grant(1, 1_000) - val fireAndForget = leaseRSocket.fireAndForget(DefaultPayload("test")) - val firstErr = fireAndForget.blockingGet() - assertTrue(firstErr == null) - val secondErr = fireAndForget.blockingGet() - assertTrue(secondErr is MissingLeaseException) - } - - @Test - fun connectionNotAvailable() { - leaseManager.grant(1, 1_000) - rSocket.setAvailability(0.0f) - assertEquals(0.0, leaseRSocket.availability(), 1e-5) - } - - private class MockRSocket : RSocket { - private var availability = 1.0f - - fun setAvailability(availability: Float) { - this.availability = availability - } - - override fun availability(): Double = availability.toDouble() - - override fun fireAndForget(payload: Payload): Completable = - Completable.complete() - - override fun requestResponse(payload: Payload): Single = - Single.just(payload) - - override fun requestStream(payload: Payload): Flowable = - Flowable.just(payload) - - override fun requestChannel(payloads: Publisher): Flowable = - Flowable.fromPublisher(payloads) - - override fun metadataPush(payload: Payload): Completable = - Completable.complete() - - override fun close(): Completable = Completable.complete() - - override fun onClose(): Completable = Completable.complete() - } - - companion object { - private val EMPTY_DATA = ByteBuffer.allocateDirect(0) - } -} \ No newline at end of file