diff --git a/README.md b/README.md index 809e4fc09..ecb47caa0 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,6 @@ private fun handler(setup: Setup, rSocket: RSocket): Single { ### LICENSE Copyright 2015-2018 Netflix, Inc. -Maksym Ostroverkhov Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/ClientOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/ClientOptions.kt new file mode 100644 index 000000000..25f23c1be --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/ClientOptions.kt @@ -0,0 +1,7 @@ +package io.rsocket.kotlin + +class ClientOptions : Options() { + + override fun copy(): ClientOptions = + ClientOptions().streamRequestLimit(streamRequestLimit()) +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt index 701699847..d7da58816 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt @@ -2,10 +2,6 @@ package io.rsocket.kotlin import java.util.concurrent.TimeUnit -/** - * Created by Maksym Ostroverkhov on 27.10.17. - */ - data class Duration(private val value: Long, val unit: TimeUnit) { val millis = unit.toMillis(value) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveData.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveData.kt new file mode 100644 index 000000000..1c4ed360e --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveData.kt @@ -0,0 +1,11 @@ +package io.rsocket.kotlin + +import java.nio.ByteBuffer + +interface KeepAliveData { + + fun producer(): () -> ByteBuffer + + fun handler(): (ByteBuffer) -> Unit +} + diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt index 8115cad7d..fc0f49f3e 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt @@ -1,8 +1,11 @@ package io.rsocket.kotlin +import io.rsocket.kotlin.internal.EmptyKeepAliveData + class KeepAliveOptions : KeepAlive { private var interval: Duration = Duration.ofMillis(100) private var maxLifeTime: Duration = Duration.ofSeconds(1) + private var keepAliveData: KeepAliveData = EmptyKeepAliveData() fun keepAliveInterval(interval: Duration): KeepAliveOptions { assertDuration(interval, "keepAliveInterval") @@ -20,9 +23,17 @@ class KeepAliveOptions : KeepAlive { override fun keepAliveMaxLifeTime() = maxLifeTime + fun keepAliveData(keepAliveData: KeepAliveData): KeepAliveOptions { + this.keepAliveData = keepAliveData + return this + } + + fun keepAliveData(): KeepAliveData = keepAliveData + fun copy(): KeepAliveOptions = KeepAliveOptions() .keepAliveInterval(interval) .keepAliveMaxLifeTime(maxLifeTime) + .keepAliveData(keepAliveData) private fun assertDuration(duration: Duration, name: String) { if (duration.millis <= 0) { diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Options.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Options.kt new file mode 100644 index 000000000..201d023a4 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/Options.kt @@ -0,0 +1,22 @@ +package io.rsocket.kotlin + +abstract class Options> internal constructor() { + private var streamRequestLimit: Int = 128 + + @Suppress("UNCHECKED_CAST") + open fun streamRequestLimit(streamRequestLimit: Int): T { + assertRequestLimit(streamRequestLimit) + this.streamRequestLimit = streamRequestLimit + return this as T + } + + abstract fun copy(): T + + internal fun streamRequestLimit(): Int = streamRequestLimit + + private fun assertRequestLimit(streamRequestLimit: Int) { + if (streamRequestLimit <= 0) { + throw IllegalArgumentException("stream request limit must be positive") + } + } +} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt index ffaac747d..1791a2983 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt @@ -55,7 +55,7 @@ object RSocketFactory { private var setupPayload: Payload = DefaultPayload.EMPTY private val keepAlive = KeepAliveOptions() private val mediaType = MediaTypeOptions() - private var streamRequestLimit = defaultStreamRequestLimit + private val options = ClientOptions() fun interceptors(configure: (InterceptorOptions) -> Unit): ClientRSocketFactory { configure(interceptors) @@ -79,7 +79,7 @@ object RSocketFactory { return this } - fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory { + fun lease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory { this.flags = Frame.Setup.enableLease(flags) this.leaseRefConsumer = leaseRefConsumer return this @@ -95,9 +95,8 @@ object RSocketFactory { return this } - fun streamRequestLimit(streamRequestLimit: Int): ClientRSocketFactory { - assertRequestLimit(streamRequestLimit) - this.streamRequestLimit = streamRequestLimit + fun options(configure: (ClientOptions) -> Unit): ClientRSocketFactory { + configure(options) return this } @@ -126,7 +125,7 @@ object RSocketFactory { setupPayload, keepAlive.copy(), mediaType.copy(), - streamRequestLimit, + options.copy(), transport, interceptors.copy()) @@ -137,13 +136,17 @@ object RSocketFactory { private val leaseRef: ((LeaseRef) -> Unit)?, private val flags: Int, private val setupPayload: Payload, - private val keepAlive: KeepAlive, + keepAliveOpts: KeepAliveOptions, private val mediaType: MediaType, - private val streamRequestLimit: Int, + options: ClientOptions, private val transportClient: () -> ClientTransport, private val parentInterceptors: InterceptorRegistry) : Start { + private val streamRequestLimit = options.streamRequestLimit() + private val keepALive = keepAliveOpts as KeepAlive + private val keepAliveData = keepAliveOpts.keepAliveData() + override fun start(): Single { return transportClient() .connect() @@ -184,7 +187,8 @@ object RSocketFactory { ClientServiceHandler( demuxer.serviceConnection(), - keepAlive, + keepALive, + keepAliveData, errorConsumer) val setupFrame = createSetupFrame() @@ -205,8 +209,8 @@ object RSocketFactory { private fun createSetupFrame(): Frame { return Frame.Setup.from( flags, - keepAlive.keepAliveInterval().intMillis, - keepAlive.keepAliveMaxLifeTime().intMillis, + keepALive.keepAliveInterval().intMillis, + keepALive.keepAliveMaxLifeTime().intMillis, mediaType.metadataMimeType(), mediaType.dataMimeType(), setupPayload) @@ -229,7 +233,7 @@ object RSocketFactory { private var mtu = 0 private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null private val interceptors = GlobalInterceptors.create() - private var streamRequestLimit = defaultStreamRequestLimit + private val options = ServerOptions() fun interceptors(configure: (InterceptorOptions) -> Unit): ServerRSocketFactory { configure(interceptors) @@ -242,7 +246,7 @@ object RSocketFactory { return this } - fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory { + fun lease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory { this.leaseRefConsumer = leaseRefConsumer return this } @@ -252,8 +256,8 @@ object RSocketFactory { return this } - fun streamRequestLimit(streamRequestLimit: Int): ServerRSocketFactory { - this.streamRequestLimit = streamRequestLimit + fun options(configure: (ServerOptions) -> Unit): ServerRSocketFactory { + configure(options) return this } @@ -268,7 +272,7 @@ object RSocketFactory { mtu, leaseRefConsumer, interceptors.copy(), - streamRequestLimit) + options.copy()) } } @@ -279,7 +283,9 @@ object RSocketFactory { private val mtu: Int, private val leaseRef: ((LeaseRef) -> Unit)?, private val parentInterceptors: InterceptorRegistry, - private val streamRequestLimit: Int) : Start { + options: ServerOptions) : Start { + + private val streamRequestLimit = options.streamRequestLimit() override fun start(): Single { return transportServer().start(object @@ -331,12 +337,15 @@ object RSocketFactory { ServerServiceHandler( demuxer.serviceConnection(), - setup as KeepAlive, + setup, errorConsumer) val handlerRSocket = acceptor()(setup, wrappedRequester) - return handlerRSocket + val rejectingHandlerRSocket = RejectingRSocket(handlerRSocket) + .with(demuxer.requesterConnection()) + + return rejectingHandlerRSocket .map { handler -> interceptors.interceptHandler(handler) } .doOnSuccess { handler -> RSocketResponder( @@ -374,12 +383,6 @@ object RSocketFactory { } } - private fun assertRequestLimit(streamRequestLimit: Int) { - if (streamRequestLimit <= 0) { - throw IllegalArgumentException("stream request limit must be positive") - } - } - private fun assertFragmentation(mtu: Int) { if (mtu < 0) { throw IllegalArgumentException("fragmentation mtu must be non-negative") @@ -405,8 +408,6 @@ object RSocketFactory { transport { transport } } - private const val defaultStreamRequestLimit = 128 - private val emptyRSocket = object : AbstractRSocket() {} } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/ServerOptions.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/ServerOptions.kt new file mode 100644 index 000000000..9b0068969 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/ServerOptions.kt @@ -0,0 +1,7 @@ +package io.rsocket.kotlin + +class ServerOptions : Options() { + + override fun copy(): ServerOptions = + ServerOptions().streamRequestLimit(streamRequestLimit()) +} diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ClientServiceHandler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ClientServiceHandler.kt index ae5bf1d94..c86009161 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ClientServiceHandler.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ClientServiceHandler.kt @@ -6,18 +6,23 @@ import io.reactivex.Flowable import io.reactivex.disposables.Disposable import io.rsocket.kotlin.DuplexConnection import io.rsocket.kotlin.Frame -import io.rsocket.kotlin.exceptions.ConnectionException import io.rsocket.kotlin.KeepAlive +import io.rsocket.kotlin.KeepAliveData +import io.rsocket.kotlin.exceptions.ConnectionException +import java.nio.ByteBuffer import java.util.concurrent.TimeUnit internal class ClientServiceHandler(serviceConnection: DuplexConnection, keepAlive: KeepAlive, + keepAliveData: KeepAliveData, errorConsumer: (Throwable) -> Unit) : ServiceHandler(serviceConnection, errorConsumer) { @Volatile private var keepAliveReceivedMillis = System.currentTimeMillis() private var subscription: Disposable? = null + private val dataProducer: () -> ByteBuffer = keepAliveData.producer() + private val dataHandler: (ByteBuffer) -> Unit = keepAliveData.handler() init { val tickPeriod = keepAlive.keepAliveInterval().millis @@ -35,6 +40,9 @@ internal class ClientServiceHandler(serviceConnection: DuplexConnection, override fun handleKeepAlive(frame: Frame) { if (!Frame.Keepalive.hasRespondFlag(frame)) { keepAliveReceivedMillis = System.currentTimeMillis() + dataHandler(frame.data) + } else { + sendKeepAliveFrame(frame.data, false) } } @@ -51,8 +59,12 @@ internal class ClientServiceHandler(serviceConnection: DuplexConnection, "keep-alive timed out: $duration of $timeout ms" throw ConnectionException(message) } - sentFrames.onNext( - Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true)) + sendKeepAliveFrame(dataProducer(), true) } } + + private fun sendKeepAliveFrame(data: ByteBuffer, respond: Boolean) { + sentFrames.onNext( + Frame.Keepalive.from(Unpooled.wrappedBuffer(data), respond)) + } } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/EmptyKeepAliveData.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/EmptyKeepAliveData.kt new file mode 100644 index 000000000..95bced4a2 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/EmptyKeepAliveData.kt @@ -0,0 +1,17 @@ +package io.rsocket.kotlin.internal + +import io.rsocket.kotlin.KeepAliveData +import java.nio.ByteBuffer + +internal class EmptyKeepAliveData : KeepAliveData { + + override fun producer(): () -> ByteBuffer = noopProducer + + override fun handler(): (ByteBuffer) -> Unit = noopHandler + + companion object { + private val emptyBuffer = ByteBuffer.allocateDirect(0) + private val noopProducer = { emptyBuffer } + private val noopHandler: (ByteBuffer) -> Unit = { } + } +} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt index 576ead426..281c474b3 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt @@ -21,14 +21,12 @@ import io.reactivex.Flowable import io.reactivex.FlowableSubscriber import io.reactivex.Single import io.reactivex.processors.FlowableProcessor -import io.reactivex.processors.PublishProcessor import io.reactivex.processors.UnicastProcessor import io.rsocket.kotlin.* import io.rsocket.kotlin.exceptions.ApplicationException import io.rsocket.kotlin.exceptions.ChannelRequestException import io.rsocket.kotlin.exceptions.Exceptions import io.rsocket.kotlin.internal.ExceptionUtil.noStacktrace -import io.rsocket.kotlin.DefaultPayload import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription @@ -113,7 +111,7 @@ internal class RSocketRequester( val requestFrame = Frame.Request.from( streamId, FrameType.REQUEST_RESPONSE, payload, 1) - val receiver = PublishProcessor.create() + val receiver = UnicastProcessor.create() receivers[streamId] = receiver sentFrames.onNext(requestFrame) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RejectingRSocket.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RejectingRSocket.kt new file mode 100644 index 000000000..552fdc117 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RejectingRSocket.kt @@ -0,0 +1,18 @@ +package io.rsocket.kotlin.internal + +import io.reactivex.Single +import io.rsocket.kotlin.DuplexConnection +import io.rsocket.kotlin.Frame +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.exceptions.RejectedSetupException + +internal class RejectingRSocket(private val rSocket: Single) { + + fun with(connection: DuplexConnection): Single = rSocket + .onErrorResumeNext { err -> + connection + .sendOne(Frame.Error.from(0, + RejectedSetupException(err.message ?: ""))) + .andThen(Single.error(err)) + } +} \ No newline at end of file diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerServiceHandler.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerServiceHandler.kt index 46a70ee70..f4e4143c6 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerServiceHandler.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ServerServiceHandler.kt @@ -6,11 +6,11 @@ import io.reactivex.Flowable import io.reactivex.disposables.Disposable import io.rsocket.kotlin.DuplexConnection import io.rsocket.kotlin.Frame -import io.rsocket.kotlin.exceptions.ConnectionException import io.rsocket.kotlin.KeepAlive +import io.rsocket.kotlin.exceptions.ConnectionException import java.util.concurrent.TimeUnit -internal class ServerServiceHandler(serviceConnection: DuplexConnection, +internal class ServerServiceHandler(private val serviceConnection: DuplexConnection, keepAlive: KeepAlive, errorConsumer: (Throwable) -> Unit) : ServiceHandler(serviceConnection, errorConsumer) { @@ -22,7 +22,7 @@ internal class ServerServiceHandler(serviceConnection: DuplexConnection, init { val tickPeriod = keepAlive.keepAliveInterval().millis val timeout = keepAlive.keepAliveMaxLifeTime().millis - Flowable.interval(tickPeriod, TimeUnit.MILLISECONDS) + subscription = Flowable.interval(tickPeriod, TimeUnit.MILLISECONDS) .concatMapCompletable { checkKeepAlive(timeout) } .subscribe({}, { err -> diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt index fbb5dbcf1..b98107638 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/ConnectionLeaseRef.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.reactivex.Completable diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt index 978de9120..9f73e5445 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseContext.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease /** State shared by Lease related interceptors */ diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt index c6dc64765..5ec3d39ea 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseEnablingInterceptor.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.reactivex.Flowable diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt index 1b2f812f5..9864d6af8 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranter.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.reactivex.Completable diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt index 7a9b7e6d8..7e51dc922 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnection.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.netty.buffer.Unpooled diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt index b538ece44..f9bc7bd6e 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterInterceptor.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.rsocket.kotlin.DuplexConnection diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt index b8ae2f4e9..de2b538ab 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseInterceptor.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.rsocket.kotlin.RSocket diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt index 14224d4c8..aace5d536 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseManager.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.rsocket.kotlin.exceptions.MissingLeaseException diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt index 7263cc229..8db49a7a1 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseRSocket.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.reactivex.Completable diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt index f075af7d5..09da5a1f6 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/lease/LeaseSupport.kt @@ -1,19 +1,3 @@ -/* - * Copyright 2018 Maksym Ostroverkhov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package io.rsocket.kotlin.internal.lease import io.rsocket.kotlin.LeaseRef diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt index 956282a33..aea9cb6fd 100644 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/ServiceHandlerTest.kt @@ -1,7 +1,6 @@ package io.rsocket.kotlin import io.netty.buffer.Unpooled -import io.netty.buffer.Unpooled.EMPTY_BUFFER import io.reactivex.processors.UnicastProcessor import io.rsocket.kotlin.exceptions.ConnectionException import io.rsocket.kotlin.exceptions.RejectedSetupException @@ -12,6 +11,8 @@ import org.junit.After import org.junit.Assert.* import org.junit.Before import org.junit.Test +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit class ServiceHandlerTest { @@ -19,7 +20,7 @@ class ServiceHandlerTest { lateinit var receiver: UnicastProcessor lateinit var conn: LocalDuplexConnection private lateinit var errors: Errors - private lateinit var keepAlive: KeepAlive + private lateinit var keepAlive: KeepAliveOptions @Before fun setUp() { @@ -73,10 +74,7 @@ class ServiceHandlerTest { @Test(timeout = 2_000) fun clientServiceHandlerKeepAlive() { - ClientServiceHandler( - conn, - KeepAliveOptions(), - errors) + ClientServiceHandler(conn, keepAlive, keepAlive.keepAliveData(), errors) val sentKeepAlives = sender.take(3).toList().blockingGet() for (frame in sentKeepAlives) { assertTrue(frame.type == FrameType.KEEPALIVE) @@ -86,7 +84,7 @@ class ServiceHandlerTest { @Test(timeout = 2_000) fun clientServiceHandlerKeepAliveTimeout() { - ClientServiceHandler(conn, keepAlive, errors) + ClientServiceHandler(conn, keepAlive, keepAlive.keepAliveData(), errors) conn.onClose().blockingAwait() val errs = errors.get() assertEquals(1, errs.size) @@ -97,6 +95,81 @@ class ServiceHandlerTest { ?: throw AssertionError( "ConnectionException error must be non-null")) } + + @Test(timeout = 2_000) + fun clientKeepAliveRespond() { + val expectedReceive = "receive" + val keepAlive = KeepAliveOptions() + .keepAliveInterval(Duration.ofSeconds(42)) + ClientServiceHandler(conn, keepAlive, keepAlive.keepAliveData(), errors) + + receiver.onNext(Frame.Keepalive.from(Unpooled.wrappedBuffer(expectedReceive.bytes()), true)) + + val sent = sender + .filter { it.type == FrameType.KEEPALIVE } + .firstOrError() + .blockingGet() + + assertFalse(Frame.Keepalive.hasRespondFlag(sent)) + assertEquals(expectedReceive, sent.dataUtf8) + assertTrue(errors.get().isEmpty()) + } + + @Test + fun clientKeepAliveDataProducer() { + val expectedSent = "test" + val kad = TestKeepAliveData(expectedSent) + val keepAlive = KeepAliveOptions() + .keepAliveData(kad) + ClientServiceHandler(conn, keepAlive, keepAlive.keepAliveData(), errors) + val sent = sender + .filter { it.type == FrameType.KEEPALIVE } + .take(2) + .toList() + .blockingGet() + sent.forEach { + val actualData = it.dataUtf8 + assertEquals(expectedSent, actualData) + } + assertTrue(errors.get().isEmpty()) + } + + @Test + fun clientKeepAliveDataHandler() { + val expectedReceived = "receive" + val kad = TestKeepAliveData("test") + val keepAlive = KeepAliveOptions() + .keepAliveData(kad) + ClientServiceHandler(conn, keepAlive, keepAlive.keepAliveData(), errors) + receiver.onNext(Frame.Keepalive.from(Unpooled.wrappedBuffer(expectedReceived.bytes()), false)) + receiver.onNext(Frame.Keepalive.from(Unpooled.wrappedBuffer(expectedReceived.bytes()), false)) + + val actualReceive = kad.handled() + assertEquals(2, actualReceive.size) + actualReceive.forEach { actual -> + assertEquals(expectedReceived, actual) + } + assertTrue(errors.get().isEmpty()) + } +} + +private fun String.bytes() = toByteArray(StandardCharsets.UTF_8) + +private fun ByteBuffer.asString() = StandardCharsets.UTF_8.decode(this).toString() + +private class TestKeepAliveData(private val data: String) : KeepAliveData { + + private val handled = ArrayList() + + override fun producer(): () -> ByteBuffer = { + ByteBuffer.wrap(data.bytes()) + } + + override fun handler(): (ByteBuffer) -> Unit = { + handled += it.asString() + } + + fun handled(): List = ArrayList(handled) } private class Errors : (Throwable) -> Unit { diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/RejectingRSocketTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/RejectingRSocketTest.kt new file mode 100644 index 000000000..364825b52 --- /dev/null +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/RejectingRSocketTest.kt @@ -0,0 +1,55 @@ +package io.rsocket.kotlin.internal + +import io.reactivex.Single +import io.reactivex.processors.UnicastProcessor +import io.rsocket.kotlin.Frame +import io.rsocket.kotlin.FrameType +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.exceptions.Exceptions +import io.rsocket.kotlin.exceptions.RejectedSetupException +import io.rsocket.kotlin.test.util.LocalDuplexConnection +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Before +import org.junit.Test + +class RejectingRSocketTest { + private lateinit var sender: UnicastProcessor + private lateinit var receiver: UnicastProcessor + private lateinit var conn: LocalDuplexConnection + + @Before + fun setUp() { + sender = UnicastProcessor.create() + receiver = UnicastProcessor.create() + conn = LocalDuplexConnection("test", sender, receiver) + } + + @Test(timeout = 2_000) + fun rejectError() { + val expectedMsg = "error" + val rejectingRSocket = rejectingRSocket(IllegalArgumentException(expectedMsg)) + rejectingRSocket.subscribe({}, {}) + val frame = sender.firstOrError().blockingGet() + assertTrue(frame.type == FrameType.ERROR) + assertTrue(frame.streamId == 0) + val err = Exceptions.from(frame) + assertTrue(err is RejectedSetupException) + assertEquals(expectedMsg, err.message) + } + + @Test(timeout = 2_000) + fun rejectErrorEmptyMessage() { + val rejectingRSocket = rejectingRSocket(RuntimeException()) + rejectingRSocket.subscribe({}, {}) + val frame = sender.firstOrError().blockingGet() + val err = Exceptions.from(frame) + assertTrue(err is RejectedSetupException) + assertEquals("", err.message) + } + + private fun rejectingRSocket(err: Throwable): Single { + val rSocket = Single.error(err) + return RejectingRSocket(rSocket).with(conn) + } +} \ No newline at end of file diff --git a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt index 160d49851..21d3058f8 100644 --- a/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt +++ b/rsocket-core/src/test/kotlin/io/rsocket/kotlin/internal/lease/LeaseGranterConnectionTest.kt @@ -42,7 +42,7 @@ class LeaseGranterConnectionTest { @Test fun sentLease() { leaseGranterConnection.send( - Flowable.just(Frame.Lease.from(2, 1, Unpooled.EMPTY_BUFFER))) + Flowable.just(Frame.Lease.from(2_000, 1, Unpooled.EMPTY_BUFFER))) .blockingAwait() assertEquals(1.0, receive.availability(), 1e-5) assertEquals(0.0, send.availability(), 1e-5) diff --git a/rsocket-transport-netty/build.gradle b/rsocket-transport-netty/build.gradle index 69e155cec..3dadb3de0 100644 --- a/rsocket-transport-netty/build.gradle +++ b/rsocket-transport-netty/build.gradle @@ -3,7 +3,7 @@ apply plugin: 'com.jfrog.artifactory' dependencies { api project(":rsocket-core") - implementation ('io.projectreactor.ipc:reactor-netty:0.7.7.RELEASE') + implementation ('io.projectreactor.ipc:reactor-netty:0.7.8.RELEASE') } compileKotlin { kotlinOptions.jvmTarget = "1.8" diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt index d9042b020..aa5e8780d 100644 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt +++ b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt @@ -38,8 +38,7 @@ class TcpServerTransport private constructor(private var server: TcpServer) inbound, outbound, inbound.context()) - acceptor(connection).subscribe() - outbound.neverComplete() + acceptor(connection).andThen(outbound.neverComplete()) }.toSingle().map { NettyContextCloseable(it) } companion object { diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt index df442a31e..102169f9e 100644 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt +++ b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt @@ -22,12 +22,9 @@ import io.rsocket.kotlin.transport.ServerTransport import io.rsocket.kotlin.transport.ServerTransport.ConnectionAcceptor import io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection import io.rsocket.kotlin.transport.netty.toSingle -import reactor.core.publisher.Mono import reactor.ipc.netty.http.server.HttpServer import reactor.ipc.netty.http.server.HttpServerRoutes -import java.util.function.Consumer - class WebsocketRouteTransport(private val server: HttpServer, private val routesBuilder: (HttpServerRoutes) -> Unit, private val path: String) : ServerTransport { @@ -41,8 +38,7 @@ class WebsocketRouteTransport(private val server: HttpServer, inbound, outbound, inbound.context()) - acceptor(connection).subscribe() - outbound.neverComplete() + acceptor(connection).andThen(outbound.neverComplete()) } }.toSingle().map { NettyContextCloseable(it) } } diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt index 218b436bd..21fcc3357 100644 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt +++ b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt @@ -39,8 +39,7 @@ class WebsocketServerTransport private constructor(internal var server: HttpServ inbound, outbound, inbound.context()) - acceptor(connection).subscribe() - outbound.neverComplete() + acceptor(connection).andThen(outbound.neverComplete()) } }.toSingle().map { NettyContextCloseable(it) } } diff --git a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt index 6b4e67e2d..ff36a4da8 100644 --- a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt +++ b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt @@ -13,9 +13,6 @@ import okio.ByteString import org.reactivestreams.Publisher import java.nio.channels.ClosedChannelException -/** - * Created by Maksym Ostroverkhov on 27.10.17. - */ internal class OkWebsocket(client: OkHttpClient, request: Request) { @Volatile diff --git a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/client/OkhttpWebsocketClientTransport.kt b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/client/OkhttpWebsocketClientTransport.kt index f2afaca75..2047b0bdd 100644 --- a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/client/OkhttpWebsocketClientTransport.kt +++ b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/client/OkhttpWebsocketClientTransport.kt @@ -8,9 +8,6 @@ import okhttp3.HttpUrl import okhttp3.OkHttpClient import okhttp3.Request -/** - * Created by Maksym Ostroverkhov on 28.10.17. - */ class OkhttpWebsocketClientTransport private constructor(private val client: OkHttpClient, private val request: Request) : ClientTransport { diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt index 238149ed9..a7e5a4e3d 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt @@ -1,5 +1,6 @@ package io.rsocket.kotlin.test.lease +import io.reactivex.Flowable import io.reactivex.Single import io.reactivex.processors.BehaviorProcessor import io.reactivex.subscribers.TestSubscriber @@ -25,14 +26,14 @@ class LeaseServerTest { fun setUp() { serverLease = LeaseRefs() nettyContextCloseable = RSocketFactory.receive() - .enableLease(serverLease) + .lease(serverLease) .acceptor { { _, _ -> Single.just( object : AbstractRSocket() { - override fun requestResponse(payload: Payload) - : Single = - Single.just(payload) + override fun requestStream(payload: Payload) + : Flowable = + Flowable.just(payload) }) } } @@ -43,7 +44,7 @@ class LeaseServerTest { val address = nettyContextCloseable.address() clientSocket = RSocketFactory .connect() - .enableLease(LeaseRefs()) + .lease(LeaseRefs()) .keepAlive { opts -> opts.keepAliveInterval(Duration.ofMinutes(1)) .keepAliveMaxLifeTime(Duration.ofMinutes(20)) @@ -69,16 +70,15 @@ class LeaseServerTest { .delay(100, TimeUnit.MILLISECONDS) .blockingAwait() assertEquals(clientSocket.availability(), 1.0, 1e-5) - clientSocket.requestResponse(payload()) - .blockingGet() + clientSocket.requestStream(payload()) + .blockingSubscribe() assertEquals(clientSocket.availability(), 0.5, 1e-5) - clientSocket.requestResponse(payload()) - .blockingGet() + clientSocket.requestStream(payload()) + .blockingSubscribe() assertEquals(clientSocket.availability(), 0.0, 1e-5) val subscriber = TestSubscriber() - clientSocket.requestResponse(payload()) - .toFlowable() + clientSocket.requestStream(payload()) .blockingSubscribe(subscriber) assertEquals(1, subscriber.errorCount()) assertTrue(subscriber.errors().first() is MissingLeaseException) @@ -96,7 +96,7 @@ class LeaseServerTest { assertEquals(clientSocket.availability(), 0.0, 1e-5) val subscriber = TestSubscriber() - clientSocket.requestResponse(payload()).toFlowable() + clientSocket.requestStream(payload()) .blockingSubscribe(subscriber) assertEquals(1, subscriber.errorCount()) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt index 74e650e0d..06d10a42a 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt @@ -20,7 +20,7 @@ object LeaseClientServerExample { val serverLease = LeaseRefs() val nettyContextCloseable = RSocketFactory.receive() - .enableLease(serverLease) + .lease(serverLease) .acceptor { { _, _ -> Single.just( @@ -37,7 +37,7 @@ object LeaseClientServerExample { val address = nettyContextCloseable.address() val clientSocket = RSocketFactory.connect() - .enableLease(LeaseRefs()) + .lease(LeaseRefs()) .keepAlive { opts -> opts.keepAliveInterval(Duration.ofMinutes(1)) .keepAliveMaxLifeTime(Duration.ofMinutes(20)) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt index 31632083f..c80856fa9 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt @@ -6,8 +6,4 @@ import io.rsocket.kotlin.transport.netty.server.TcpServerTransport class NettyTcpEndToEndTest : EndToEndTest( { TcpClientTransport.create(it) }, - { TcpServerTransport.create(it) }) { - - override fun response() { - } -} \ No newline at end of file + { TcpServerTransport.create(it) }) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt index cf9f1af98..218b36a58 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt @@ -1,17 +1,8 @@ package io.rsocket.kotlin.test.transport -import io.rsocket.kotlin.test.transport.EndToEndTest import io.rsocket.kotlin.transport.netty.client.WebsocketClientTransport import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport class NettyWebsocketEndToEndTest : EndToEndTest( { WebsocketClientTransport.create(it) }, - { WebsocketServerTransport.create(it.port) }) { - - /*noop until https://github.com/reactor/reactor-netty/pull/346 is released*/ - override fun close() { - } - - override fun closedAvailability() { - } -} \ No newline at end of file + { WebsocketServerTransport.create(it.port) })