diff --git a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt b/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt index d954be218..5b93f578d 100644 --- a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt +++ b/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt @@ -70,9 +70,9 @@ class RSocketKotlinBenchmark : RSocketBenchmark() { } } - override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( - ByteArray(size / 2).also { Random.nextBytes(it) }, - ByteArray(size / 2).also { Random.nextBytes(it) } + override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload.wrap( + data = ByteArray(size / 2).also { Random.nextBytes(it) }, + metadata = ByteArray(size / 2).also { Random.nextBytes(it) } ) override fun releasePayload(payload: Payload) { diff --git a/build.gradle.kts b/build.gradle.kts index 43f906da0..d501610ab 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,6 @@ import com.jfrog.bintray.gradle.* import com.jfrog.bintray.gradle.tasks.* import org.gradle.api.publish.maven.internal.artifact.* import org.jetbrains.kotlin.gradle.dsl.* -import org.jetbrains.kotlin.gradle.plugin.* import org.jetbrains.kotlin.gradle.targets.js.* import org.jetbrains.kotlin.gradle.targets.jvm.* import org.jfrog.gradle.plugin.artifactory.dsl.* @@ -62,8 +61,21 @@ subprojects { is KotlinJsTarget -> { useCommonJs() //configure running tests for JS - nodejs { testTask { useKarma { useChromeHeadless() } } } - browser { testTask { useKarma { useChromeHeadless() } } } + nodejs { + testTask { + useMocha { + timeout = "600s" + } + } + } + browser { + testTask { + useKarma { + useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d")) + useChromeHeadless() + } + } + } } is KotlinJvmTarget -> { compilations.all { @@ -81,7 +93,7 @@ subprojects { useExperimentalAnnotation("kotlin.RequiresOptIn") - if (name.contains("test", ignoreCase = true)) { + if (name.contains("test", ignoreCase = true) || project.name == "rsocket-test") { useExperimentalAnnotation("kotlin.time.ExperimentalTime") useExperimentalAnnotation("kotlin.ExperimentalStdlibApi") useExperimentalAnnotation("kotlinx.coroutines.ExperimentalCoroutinesApi") @@ -94,20 +106,10 @@ subprojects { } } - - //should be not needed after ~1.4.20 when kotlin-test will be added automatically - val commonTest by sourceSets.getting { - dependencies { - implementation(kotlin("test-common")) - implementation(kotlin("test-annotations-common")) - } - } - targets.all { - compilations.findByName("test")?.dependencies { - when (platformType) { - KotlinPlatformType.jvm -> implementation(kotlin("test-junit")) - KotlinPlatformType.js -> implementation(kotlin("test-js")) - else -> Unit + if (project.name != "rsocket-test") { + val commonTest by sourceSets.getting { + dependencies { + implementation(project(":rsocket-test")) } } } diff --git a/js/karma.config.d/karma.conf.js b/js/karma.config.d/karma.conf.js new file mode 100644 index 000000000..492779078 --- /dev/null +++ b/js/karma.config.d/karma.conf.js @@ -0,0 +1,5 @@ +config.client = config.client || {} +config.client.mocha = config.client.mocha || {} +config.client.mocha.timeout = '600s' +config.browserNoActivityTimeout = 600000 +config.browserDisconnectTimeout = 600000 diff --git a/playground/src/commonMain/kotlin/Defaults.kt b/playground/src/commonMain/kotlin/Stub.kt similarity index 100% rename from playground/src/commonMain/kotlin/Defaults.kt rename to playground/src/commonMain/kotlin/Stub.kt diff --git a/playground/src/jvmMain/kotlin/tcp/TCPClient.kt b/playground/src/commonMain/kotlin/TCP.kt similarity index 51% rename from playground/src/jvmMain/kotlin/tcp/TCPClient.kt rename to playground/src/commonMain/kotlin/TCP.kt index 89faf7028..d52c4dd63 100644 --- a/playground/src/jvmMain/kotlin/tcp/TCPClient.kt +++ b/playground/src/commonMain/kotlin/TCP.kt @@ -14,40 +14,42 @@ * limitations under the License. */ -package tcp - -import doSomething import io.ktor.network.selector.* import io.ktor.network.sockets.* import io.ktor.util.* import io.rsocket.kotlin.connection.* +import io.rsocket.kotlin.core.* import io.rsocket.kotlin.payload.* -import kotlinx.coroutines.* -import java.util.concurrent.* - -@OptIn(KtorExperimentalAPI::class) -fun main(): Unit = runBlocking { - val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher() - val socket = aSocket(ActorSelectorManager(dispatcher)).tcp().connect("127.0.0.1", 2323) - - val client = socket.connection.connectClient() - try { - client.doSomething() - } catch (e: Throwable) { - dispatcher.close() - throw e - } +import kotlin.coroutines.* + +@OptIn(KtorExperimentalAPI::class, InternalAPI::class) +suspend fun runTcpClient(dispatcher: CoroutineContext): Unit { + aSocket(SelectorManager(dispatcher)) + .tcp() + .connect("127.0.0.1", 2323) + .connection + .connectClient() + .doSomething() } +@OptIn(KtorExperimentalAPI::class, InternalAPI::class) +suspend fun runTcpServer(dispatcher: CoroutineContext): Unit { + aSocket(SelectorManager(dispatcher)) + .tcp() + .bind("127.0.0.1", 2323) + .rSocket(acceptor = rSocketAcceptor) +} //to test nodejs tcp server -@OptIn(KtorExperimentalAPI::class) -fun main2(): Unit = runBlocking { - val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect("127.0.0.1", 9000) - - val client = socket.connection.connectClient() +@OptIn(KtorExperimentalAPI::class, InternalAPI::class) +suspend fun testNodeJsServer(dispatcher: CoroutineContext) { + val client = + aSocket(SelectorManager(dispatcher)) + .tcp() + .connect("127.0.0.1", 9000) + .connection + .connectClient() val response = client.requestResponse(Payload("Hello from JVM")) - println(response.data.readText()) } diff --git a/playground/src/commonMain/kotlin/ws/WSClient.common.kt b/playground/src/commonMain/kotlin/WS.kt similarity index 90% rename from playground/src/commonMain/kotlin/ws/WSClient.common.kt rename to playground/src/commonMain/kotlin/WS.kt index ecf56e29c..4e0e32aa2 100644 --- a/playground/src/commonMain/kotlin/ws/WSClient.common.kt +++ b/playground/src/commonMain/kotlin/WS.kt @@ -14,19 +14,14 @@ * limitations under the License. */ -package ws - -import doSomething import io.ktor.client.* import io.ktor.client.engine.* import io.ktor.client.features.websocket.* import io.ktor.util.* import io.rsocket.kotlin.core.* -expect val engine: HttpClientEngineFactory<*> - @OptIn(KtorExperimentalAPI::class) -suspend fun run() { +suspend fun runWSClient(engine: HttpClientEngineFactory<*>) { val client = HttpClient(engine) { install(WebSockets) install(RSocketClientSupport) diff --git a/playground/src/jsMain/kotlin/ws/WSClient.kt b/playground/src/jsMain/kotlin/WSApp.kt similarity index 84% rename from playground/src/jsMain/kotlin/ws/WSClient.kt rename to playground/src/jsMain/kotlin/WSApp.kt index d1ab4f529..893b4820b 100644 --- a/playground/src/jsMain/kotlin/ws/WSClient.kt +++ b/playground/src/jsMain/kotlin/WSApp.kt @@ -14,11 +14,6 @@ * limitations under the License. */ -package ws - -import io.ktor.client.engine.* import io.ktor.client.engine.js.* -actual val engine: HttpClientEngineFactory<*> = Js - -suspend fun main() = run() +suspend fun main(): Unit = runWSClient(Js) diff --git a/playground/src/jvmMain/kotlin/TcpClientApp.kt b/playground/src/jvmMain/kotlin/TcpClientApp.kt new file mode 100644 index 000000000..28eab3032 --- /dev/null +++ b/playground/src/jvmMain/kotlin/TcpClientApp.kt @@ -0,0 +1,19 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.coroutines.* + +suspend fun main(): Unit = runTcpClient(Dispatchers.IO) diff --git a/playground/src/jvmMain/kotlin/TcpServerApp.kt b/playground/src/jvmMain/kotlin/TcpServerApp.kt new file mode 100644 index 000000000..494347a38 --- /dev/null +++ b/playground/src/jvmMain/kotlin/TcpServerApp.kt @@ -0,0 +1,19 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.coroutines.* + +suspend fun main(): Unit = runTcpServer(Dispatchers.IO) diff --git a/playground/src/jvmMain/kotlin/ws/WSClient.kt b/playground/src/jvmMain/kotlin/WSClientApp.kt similarity index 78% rename from playground/src/jvmMain/kotlin/ws/WSClient.kt rename to playground/src/jvmMain/kotlin/WSClientApp.kt index 7dfd1dd95..5737b29b1 100644 --- a/playground/src/jvmMain/kotlin/ws/WSClient.kt +++ b/playground/src/jvmMain/kotlin/WSClientApp.kt @@ -14,14 +14,6 @@ * limitations under the License. */ -package ws - -import io.ktor.client.engine.* import io.ktor.client.engine.cio.* -import io.ktor.util.* - -@OptIn(KtorExperimentalAPI::class) -actual val engine: HttpClientEngineFactory<*> = CIO - -suspend fun main() = run() +suspend fun main(): Unit = runWSClient(CIO) diff --git a/playground/src/jvmMain/kotlin/ws/WSServer.kt b/playground/src/jvmMain/kotlin/WSServerApp.kt similarity index 96% rename from playground/src/jvmMain/kotlin/ws/WSServer.kt rename to playground/src/jvmMain/kotlin/WSServerApp.kt index 4f68a41f5..8235bf9d8 100644 --- a/playground/src/jvmMain/kotlin/ws/WSServer.kt +++ b/playground/src/jvmMain/kotlin/WSServerApp.kt @@ -14,15 +14,12 @@ * limitations under the License. */ -package ws - import io.ktor.application.* import io.ktor.routing.* import io.ktor.server.cio.* import io.ktor.server.engine.* import io.ktor.util.* import io.rsocket.kotlin.core.* -import rSocketAcceptor @OptIn(KtorExperimentalAPI::class) fun main() { diff --git a/playground/src/jvmMain/kotlin/tcp/TCPServer.kt b/playground/src/jvmMain/kotlin/tcp/TCPServer.kt deleted file mode 100644 index 60f94b207..000000000 --- a/playground/src/jvmMain/kotlin/tcp/TCPServer.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tcp - -import io.ktor.network.selector.* -import io.ktor.network.sockets.* -import io.ktor.util.* -import io.rsocket.kotlin.core.* -import kotlinx.coroutines.* -import rSocketAcceptor -import java.util.concurrent.* - -@OptIn(KtorExperimentalAPI::class) -fun main(): Unit = runBlocking { - val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher() - val server = aSocket(ActorSelectorManager(dispatcher)).tcp().bind("127.0.0.1", 2323) - - server.rSocket(acceptor = rSocketAcceptor) -} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt index 9373a5c63..2915b8391 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt @@ -41,9 +41,9 @@ internal class RSocketResponder( fun handlerRequestResponse(frame: RequestFrame): Unit = with(state) { val streamId = frame.streamId launchCancelable(streamId) { - val response = requestOrThrow(streamId) { + val response = requestOrCancel(streamId) { requestHandler.requestResponse(frame.payload) - } + } ?: return@launchCancelable if (isActive) send(NextCompletePayloadFrame(streamId, response)) } } @@ -51,9 +51,9 @@ internal class RSocketResponder( fun handleRequestStream(initFrame: RequestFrame): Unit = with(state) { val streamId = initFrame.streamId launchCancelable(streamId) { - val response = requestOrThrow(streamId) { + val response = requestOrCancel(streamId) { requestHandler.requestStream(initFrame.payload) - } + } ?: return@launchCancelable response.collectLimiting( streamId, RequestStreamResponderFlowCollector(state, streamId, initFrame.initialRequest) @@ -68,9 +68,9 @@ internal class RSocketResponder( val request = RequestChannelResponderFlow(streamId, receiver, state) launchCancelable(streamId) { - val response = requestOrThrow(streamId) { + val response = requestOrCancel(streamId) { requestHandler.requestChannel(request) - } + } ?: return@launchCancelable response.collectLimiting( streamId, RequestStreamResponderFlowCollector(state, streamId, initFrame.initialRequest) @@ -80,13 +80,15 @@ internal class RSocketResponder( } } - private inline fun CoroutineScope.requestOrThrow(streamId: Int, block: () -> T): T { - return try { + private inline fun CoroutineScope.requestOrCancel(streamId: Int, block: () -> T): T? = + try { block() } catch (e: Throwable) { - if (isActive) state.send(ErrorFrame(streamId, e)) - throw e + if (isActive) { + state.send(ErrorFrame(streamId, e)) + cancel("Request handling failed", e) //KLUDGE: can be related to IR, because using `throw` fails on JS IR and Native + } + null } - } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt index 563e94d6b..3a1dcd927 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt @@ -78,7 +78,7 @@ internal class RSocketState( suspend inline fun Flow.collectLimiting( streamId: Int, limitingCollector: LimitingFlowCollector, - ) { + ): Unit = coroutineScope { limits[streamId] = limitingCollector try { collect(limitingCollector) @@ -86,8 +86,8 @@ internal class RSocketState( } catch (e: Throwable) { limits.remove(streamId) //if isn't active, then, that stream was cancelled, and so no need for error frame - if (currentCoroutineContext().isActive) send(ErrorFrame(streamId, e)) - throw e + if (isActive) send(ErrorFrame(streamId, e)) + cancel("Collect failed", e) //KLUDGE: can be related to IR, because using `throw` fails on JS IR and Native } } @@ -114,7 +114,7 @@ internal class RSocketState( is RequestNFrame -> limits[streamId]?.updateRequests(frame.requestN) is CancelFrame -> senders.remove(streamId)?.cancel() is ErrorFrame -> receivers.remove(streamId)?.close(frame.throwable) - is RequestFrame -> when (frame.type) { + is RequestFrame -> when (frame.type) { FrameType.Payload -> receivers[streamId]?.offer(frame) FrameType.RequestFnF -> responder.handleFireAndForget(frame) FrameType.RequestResponse -> responder.handlerRequestResponse(frame) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt index 47feaf15f..318282d0a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt @@ -20,7 +20,7 @@ import io.ktor.utils.io.core.* class Payload( val data: ByteReadPacket, - val metadata: ByteReadPacket? = null + val metadata: ByteReadPacket? = null, ) { companion object { val Empty = Payload(ByteReadPacket.Empty) @@ -42,6 +42,15 @@ fun Payload(data: String, metadata: String? = null): Payload = Payload( @Suppress("FunctionName") fun Payload(data: ByteArray, metadata: ByteArray? = null): Payload = Payload( + data = buildPacket { writeFully(data) }, + metadata = metadata?.let { buildPacket { writeFully(it) } } +) + +/** + * Wrap data and metadata arrays without copying them. + * Changes in input arrays will change payload data, same as reading from payload will change input arrays. + */ +fun Payload.Companion.wrap(data: ByteArray, metadata: ByteArray? = null): Payload = Payload( data = ByteReadPacket(data), metadata = metadata?.let { ByteReadPacket(it) } ) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt index 433c820bd..c9b97bf6c 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt @@ -23,12 +23,11 @@ import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlin.test.* -import kotlin.time.* -@ExperimentalTime -class SetupRejectionTest { +class SetupRejectionTest : SuspendTest { @Test fun responderRejectSetup() = test { val errorMessage = "error" diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index 704fd0c9a..ceff568ef 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -22,6 +22,7 @@ import io.rsocket.kotlin.error.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* @@ -29,14 +30,17 @@ import kotlin.coroutines.* import kotlin.test.* import kotlin.time.* -class RSocketRequesterTest { - private val connection = TestConnection() - private val ignoredFrames = Channel(Channel.UNLIMITED) - private val requester = run { +class RSocketRequesterTest : TestWithConnection() { + lateinit var ignoredFrames: Channel + private lateinit var requester: RSocketRequester + + override suspend fun before() { + super.before() + + ignoredFrames = Channel(Channel.UNLIMITED) val state = RSocketState(connection, KeepAlive(1000.seconds, 1000.seconds), ignoredFrames::offer) - val requester = RSocketRequester(state, StreamId.client()) + requester = RSocketRequester(state, StreamId.client()) state.start(RSocketRequestHandler { }) - requester } @Test @@ -155,7 +159,6 @@ class RSocketRequesterTest { connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) delay(200) expectItem().let { frame -> - println(frame) assertTrue(frame is CancelFrame) } delay(200) @@ -239,7 +242,7 @@ class RSocketRequesterTest { awaitClose() } val response = requester.requestChannel(request).launchIn(CoroutineScope(connection.job)) - delay(100) + delay(200) val requestFrame = connection.sentFrames.first() assertTrue(requestFrame is RequestFrame) assertEquals(FrameType.RequestChannel, requestFrame.type) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt index b98c7d0de..49b26285e 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketTest.kt @@ -23,13 +23,53 @@ import io.rsocket.kotlin.core.* import io.rsocket.kotlin.error.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlin.test.* import kotlin.time.* -class RSocketTest { +class RSocketTest : SuspendTest { + + lateinit var serverConnection: LocalConnection + lateinit var clientConnection: LocalConnection + + override suspend fun before() { + super.before() + + val clientChannel = Channel(Channel.UNLIMITED) + val serverChannel = Channel(Channel.UNLIMITED) + + serverConnection = LocalConnection("server", clientChannel, serverChannel) + clientConnection = LocalConnection("client", serverChannel, clientChannel) + + } + + override suspend fun after() { + super.after() + + serverConnection.job.cancelAndJoin() + clientConnection.job.cancelAndJoin() + } + + private suspend fun start(handler: RSocket? = null): RSocket = coroutineScope { + launch { + serverConnection.startServer { + handler ?: RSocketRequestHandler { + requestResponse = { it } + requestStream = { + flow { repeat(10) { emit(Payload("server got -> [$it]")) } } + } + requestChannel = { + it.launchIn(CoroutineScope(job)) + flow { repeat(10) { emit(Payload("server got -> [$it]")) } } + } + } + } + } + clientConnection.connectClient(RSocketConnectorConfiguration(keepAlive = KeepAlive(1000.seconds, 1000.seconds))) + } @Test fun testRequestResponseNoError() = test { @@ -230,29 +270,4 @@ class RSocketTest { assertEquals(payload.data.readText(), otherPayload.data.readText()) } - private suspend fun start(handler: RSocket? = null): RSocket { - val clientChannel = Channel(Channel.UNLIMITED) - val serverChannel = Channel(Channel.UNLIMITED) - val serverConnection = LocalConnection("server", clientChannel, serverChannel) - val clientConnection = LocalConnection("client", serverChannel, clientChannel) - - return coroutineScope { - launch { - serverConnection.startServer { - handler ?: RSocketRequestHandler { - requestResponse = { it } - requestStream = { - flow { repeat(10) { emit(Payload("server got -> [$it]")) } } - } - requestChannel = { - it.launchIn(CoroutineScope(job)) - flow { repeat(10) { emit(Payload("server got -> [$it]")) } } - } - } - } - } - clientConnection.connectClient(RSocketConnectorConfiguration(keepAlive = KeepAlive(1000.seconds, 1000.seconds))) - } - } - } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt index 6cf556990..ff9792ac0 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt @@ -21,13 +21,14 @@ import io.rsocket.kotlin.* import io.rsocket.kotlin.error.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.test.* import kotlin.time.* -class KeepAliveTest { - private val connection = TestConnection() +class KeepAliveTest : TestWithConnection() { + private fun requester(keepAlive: KeepAlive = KeepAlive(100.milliseconds, 1.seconds)): RSocket = run { val state = RSocketState(connection, keepAlive) {} val requester = RSocketRequester(state, StreamId.client()) diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt index c0fbaf25a..e6a442de4 100644 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt +++ b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/ConcurrentMap.kt @@ -16,4 +16,4 @@ package io.rsocket.kotlin.internal -actual fun concurrentMap(): MutableMap = mutableMapOf() +internal actual fun concurrentMap(): MutableMap = mutableMapOf() diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt index d29543172..1d81792b1 100644 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt +++ b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt @@ -26,17 +26,17 @@ class ConsoleLogger( override fun rawLog(level: LoggingLevel, throwable: Throwable?, message: Any?) { val meta = "[$level] ($tag)" when (level) { - LoggingLevel.ERROR -> console.error(meta, message, throwable) - LoggingLevel.WARN -> console.warn(meta, message, throwable) - LoggingLevel.INFO -> console.info(meta, message, throwable) - LoggingLevel.DEBUG -> console.log(meta, message, throwable) - LoggingLevel.TRACE -> console.log(meta, message, throwable) + LoggingLevel.ERROR -> throwable?.let { console.error(meta, message, "Error:", it) } ?: console.error(meta, message) + LoggingLevel.WARN -> throwable?.let { console.warn(meta, message, "Error:", it) } ?: console.warn(meta, message) + LoggingLevel.INFO -> throwable?.let { console.info(meta, message, "Error:", it) } ?: console.info(meta, message) + LoggingLevel.DEBUG -> throwable?.let { console.log(meta, message, "Error:", it) } ?: console.log(meta, message) + LoggingLevel.TRACE -> throwable?.let { console.log(meta, message, "Error:", it) } ?: console.log(meta, message) } } companion object : LoggerFactory { override fun logger(tag: String): Logger = ConsoleLogger(tag) - fun witLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { ConsoleLogger(it, minLevel) } + fun withLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { ConsoleLogger(it, minLevel) } } } diff --git a/rsocket-transport-test/build.gradle.kts b/rsocket-test/build.gradle.kts similarity index 77% rename from rsocket-transport-test/build.gradle.kts rename to rsocket-test/build.gradle.kts index 8aab4fef8..794a4e095 100644 --- a/rsocket-transport-test/build.gradle.kts +++ b/rsocket-test/build.gradle.kts @@ -25,19 +25,19 @@ kotlin { sourceSets { val commonMain by getting { dependencies { - implementation(project(":rsocket-core")) - implementation(kotlin("test-common")) - implementation(kotlin("test-annotations-common")) + api(project(":rsocket-core")) + api(kotlin("test-common")) + api(kotlin("test-annotations-common")) } } val jvmMain by getting { dependencies { - implementation(kotlin("test-junit")) + api(kotlin("test-junit")) } } val jsMain by getting { dependencies { - implementation(kotlin("test-js")) + api(kotlin("test-js")) } } } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt new file mode 100644 index 000000000..9853ae664 --- /dev/null +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.time.* + +interface SuspendTest { + val testTimeout: Duration get() = 1.minutes + + val beforeTimeout: Duration get() = 10.seconds + val afterTimeout: Duration get() = 10.seconds + + val debug: Boolean get() = false //change to debug tests for additional logs + + suspend fun before(): Unit = Unit + suspend fun after(): Unit = Unit + + fun test(timeout: Duration = testTimeout, block: suspend CoroutineScope.() -> Unit) = runTest { + + runCatching { + if (debug) println("[TEST] BEFORE started") + withTimeout(beforeTimeout) { before() } + }.onSuccess { + if (debug) println("[TEST] BEFORE completed") + }.onFailure { + if (debug) println("[TEST] BEFORE failed with error: ${it.stackTraceToString()}") + } + + val result = runCatching { + withTimeout(timeout) { block() } + } + + runCatching { + if (debug) println("[TEST] AFTER started") + withTimeout(afterTimeout) { after() } + }.onSuccess { + if (debug) println("[TEST] AFTER completed") + }.onFailure { + if (debug) println("[TEST] AFTER failed with error: ${it.stackTraceToString()}") + } + + result.getOrThrow() + } + + suspend fun currentJob(): Job = coroutineContext[Job]!! +} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/Test.common.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt similarity index 83% rename from rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/Test.common.kt rename to rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt index 5e1e4b29c..854ec5924 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/Test.common.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt @@ -14,11 +14,10 @@ * limitations under the License. */ -package io.rsocket.kotlin +package io.rsocket.kotlin.test import kotlinx.coroutines.* -import kotlin.time.* -expect fun test(timeout: Duration? = 10.seconds, block: suspend CoroutineScope.() -> Unit) +internal expect fun runTest(block: suspend CoroutineScope.() -> Unit) expect val anotherDispatcher: CoroutineDispatcher diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt similarity index 98% rename from rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt rename to rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt index a4a83f13c..8da4a2a5e 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.rsocket.kotlin +package io.rsocket.kotlin.test import io.ktor.utils.io.core.* import io.rsocket.kotlin.connection.* diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt new file mode 100644 index 000000000..97a9f1d11 --- /dev/null +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +class TestRSocket : RSocket { + override val job: Job = Job() + + override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.release() + + override suspend fun fireAndForget(payload: Payload): Unit = payload.release() + + override suspend fun requestResponse(payload: Payload): Payload { + payload.release() + return Payload(data, metadata) + } + + override fun requestStream(payload: Payload): Flow = flow { + repeat(10000) { emit(requestResponse(payload)) } + } + + override fun requestChannel(payloads: Flow): Flow = flow { + payloads.collect { emit(it) } + } + + companion object { + const val data = "hello world" + const val metadata = "metadata" + } +} diff --git a/rsocket-transport-test/src/commonMain/kotlin/io/rsocket/kotlin/Test.common.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt similarity index 73% rename from rsocket-transport-test/src/commonMain/kotlin/io/rsocket/kotlin/Test.common.kt rename to rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt index 37e6dd7d0..35d03d2d7 100644 --- a/rsocket-transport-test/src/commonMain/kotlin/io/rsocket/kotlin/Test.common.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt @@ -14,12 +14,14 @@ * limitations under the License. */ -package io.rsocket.kotlin +package io.rsocket.kotlin.test import kotlinx.coroutines.* -import kotlin.time.* -@OptIn(ExperimentalTime::class) -expect fun test(timeout: Duration? = 10.seconds, block: suspend CoroutineScope.() -> Unit) +abstract class TestWithConnection : SuspendTest { + val connection: TestConnection = TestConnection() -fun readLargePayload(name: String): String = name.repeat(1000) + override suspend fun after() { + connection.job.cancelAndJoin() + } +} diff --git a/rsocket-transport-test/src/commonMain/kotlin/io/rsocket/kotlin/TransportTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt similarity index 52% rename from rsocket-transport-test/src/commonMain/kotlin/io/rsocket/kotlin/TransportTest.kt rename to rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt index 47b836462..ca15ab3df 100644 --- a/rsocket-transport-test/src/commonMain/kotlin/io/rsocket/kotlin/TransportTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt @@ -14,9 +14,10 @@ * limitations under the License. */ -package io.rsocket.kotlin +package io.rsocket.kotlin.test import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* @@ -26,72 +27,49 @@ import kotlin.test.* import kotlin.time.* @OptIn(ExperimentalTime::class) -abstract class TransportTest(private val timeout: Duration = 10.minutes) { - private var client: RSocket? = null +abstract class TransportTest : SuspendTest { + override val testTimeout: Duration = 10.minutes - abstract suspend fun init(): RSocket + lateinit var client: RSocket //should be assigned in `before` - private suspend fun client(): RSocket = when (val c = client) { - null -> init().also { client = it } - else -> c - } - - @AfterTest - fun clean() = test { - client().job.cancelAndJoin() - } - - @Suppress("FunctionName") - private fun Payload(metadataPresent: Int): Payload { - val metadata = when (metadataPresent % 5) { - 0 -> null - 1 -> "" - else -> MOCK_METADATA - } - return Payload(MOCK_DATA, metadata) + override suspend fun after() { + client.job.cancelAndJoin() } @Test - fun fireAndForget10() = test(timeout) { - val client = client() + fun fireAndForget10() = test { (1..10).map { async { client.fireAndForget(Payload(it)) } }.awaitAll() } @Test - fun largePayloadFireAndForget10() = test(timeout) { - val client = client() + fun largePayloadFireAndForget10() = test { (1..10).map { async { client.fireAndForget(LARGE_PAYLOAD) } }.awaitAll() } @Test - fun metadataPush10() = test(timeout) { - val client = client() - (1..10).map { async { client.metadataPush(ByteReadPacket(MOCK_DATA.encodeToByteArray())) } }.awaitAll() + fun metadataPush10() = test { + (1..10).map { async { client.metadataPush(packet(MOCK_DATA)) } }.awaitAll() } @Test - fun largePayloadMetadataPush10() = test(timeout) { - val client = client() - (1..10).map { async { client.metadataPush(ByteReadPacket(LARGE_DATA.encodeToByteArray())) } }.awaitAll() + fun largePayloadMetadataPush10() = test { + (1..10).map { async { client.metadataPush(packet(LARGE_DATA)) } }.awaitAll() } @Test fun requestChannel0() = test(10.seconds) { - val client = client() val list = client.requestChannel(emptyFlow()).toList() assertTrue(list.isEmpty()) } @Test fun requestChannel1() = test(10.seconds) { - val client = client() val list = client.requestChannel(flowOf(Payload(0))).onEach { it.release() }.toList() assertEquals(1, list.size) } @Test - fun requestChannel3() = test(timeout) { - val client = client() + fun requestChannel3() = test { val request = flow { repeat(3) { emit(Payload(it)) } } @@ -100,8 +78,7 @@ abstract class TransportTest(private val timeout: Duration = 10.minutes) { } @Test - fun largePayloadRequestChannel200() = test(timeout) { - val client = client() + fun largePayloadRequestChannel200() = test { val request = flow { repeat(200) { emit(LARGE_PAYLOAD) } } @@ -110,8 +87,7 @@ abstract class TransportTest(private val timeout: Duration = 10.minutes) { } @Test - fun requestChannel20000() = test(timeout) { - val client = client() + fun requestChannel20000() = test { val request = flow { repeat(20_000) { emit(Payload(7)) } } @@ -123,8 +99,7 @@ abstract class TransportTest(private val timeout: Duration = 10.minutes) { } @Test - fun requestChannel200000() = test(timeout) { - val client = client() + fun requestChannel200000() = test { val request = flow { repeat(200_000) { emit(Payload(it)) } } @@ -133,8 +108,7 @@ abstract class TransportTest(private val timeout: Duration = 10.minutes) { } @Test - fun requestChannel256x512() = test(null) { - val client = client() + fun requestChannel256x512() = test { val request = flow { repeat(512) { emit(Payload(it)) @@ -142,104 +116,83 @@ abstract class TransportTest(private val timeout: Duration = 10.minutes) { } (0..256).map { async(Dispatchers.Default) { - withTimeout(timeout) { - val list = client.requestChannel(request).onEach { it.release() }.toList() - assertEquals(512, list.size) - } + val list = client.requestChannel(request).onEach { it.release() }.toList() + assertEquals(512, list.size) } }.awaitAll() } @Test - fun requestResponse1() = test(timeout) { - val client = client() - client.requestResponse(Payload(1)).let(::checkPayload) + fun requestResponse1() = test { + client.requestResponse(Payload(1)).let(Companion::checkPayload) } @Test - fun requestResponse10() = test(timeout) { - val client = client() - (1..10).map { async { client.requestResponse(Payload(it)).let(::checkPayload) } }.awaitAll() + fun requestResponse10() = test { + (1..10).map { async { client.requestResponse(Payload(it)).let(Companion::checkPayload) } }.awaitAll() } @Test - fun requestResponse100() = test(timeout) { - val client = client() - (1..100).map { async { client.requestResponse(Payload(it)).let(::checkPayload) } }.awaitAll() + fun requestResponse100() = test { + (1..100).map { async { client.requestResponse(Payload(it)).let(Companion::checkPayload) } }.awaitAll() } @Test - fun largePayloadRequestResponse100() = test(timeout) { - val client = client() + fun largePayloadRequestResponse100() = test { (1..100).map { async { client.requestResponse(LARGE_PAYLOAD) } }.awaitAll().onEach { it.release() } } @Test - fun requestResponse10000() = test(timeout) { - val client = client() - (1..10000).map { async { client.requestResponse(Payload(3)).let(::checkPayload) } }.awaitAll() + fun requestResponse10000() = test { + (1..10000).map { async { client.requestResponse(Payload(3)).let(Companion::checkPayload) } }.awaitAll() } @Test - fun requestResponse100000() = test(timeout) { - val client = client() - repeat(100000) { client.requestResponse(Payload(3)).let(::checkPayload) } + fun requestResponse100000() = test { + repeat(100000) { client.requestResponse(Payload(3)).let(Companion::checkPayload) } } @Test - fun requestStream5() = test(timeout) { - val client = client() - val list = client.requestStream(Payload(3)).onEach { checkPayload(it) }.take(5).toList() + fun requestStream5() = test { + val list = client.requestStream(Payload(3)).onEach { checkPayload(it) }.buffer(5).take(5).toList() assertEquals(5, list.size) } @Test - fun requestStream10000() = test(timeout) { - val client = client() + fun requestStream10000() = test { val list = client.requestStream(Payload(3)).onEach { checkPayload(it) }.toList() assertEquals(10000, list.size) } - fun checkPayload(payload: Payload) { - assertEquals(TestRSocket.data, payload.data.readText()) - assertEquals(TestRSocket.metadata, payload.metadata?.readText()) - } - companion object { - val MOCK_DATA: String = "test-data" - val MOCK_METADATA: String = "metadata" - val LARGE_DATA by lazy { readLargePayload("words.shakespeare.txt.gz") } - private val payload by lazy { Payload(LARGE_DATA, LARGE_DATA) } - val LARGE_PAYLOAD get() = payload.copy() val ACCEPTOR: RSocketAcceptor = { TestRSocket() } val CONNECTOR_CONFIG = RSocketConnectorConfiguration(keepAlive = KeepAlive(10.minutes, 100.minutes)) val SERVER_CONFIG = RSocketServerConfiguration() - } -} -class TestRSocket : RSocket { - override val job: Job = Job() - - override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.release() - - override suspend fun fireAndForget(payload: Payload): Unit = payload.release() + val MOCK_DATA: String = "test-data" + val MOCK_METADATA: String = "metadata" + val LARGE_DATA by lazy { readLargePayload("words.shakespeare.txt.gz") } + private val payload by lazy { Payload(LARGE_DATA, LARGE_DATA) } + val LARGE_PAYLOAD get() = payload.copy() - override suspend fun requestResponse(payload: Payload): Payload { - payload.release() - return Payload(data, metadata) - } + private fun packet(text: String): ByteReadPacket = buildPacket { writeText(text) } - override fun requestStream(payload: Payload): Flow = flow { - repeat(10000) { emit(requestResponse(payload)) } - } + private fun readLargePayload(name: String): String = name.repeat(1000) - override fun requestChannel(payloads: Flow): Flow = flow { - payloads.collect { emit(it) } - } + @Suppress("FunctionName") + private fun Payload(metadataPresent: Int): Payload { + val metadata = when (metadataPresent % 5) { + 0 -> null + 1 -> "" + else -> MOCK_METADATA + } + return Payload(MOCK_DATA, metadata) + } - companion object { - const val data = "hello world" - const val metadata = "metadata" + fun checkPayload(payload: Payload) { + assertEquals(TestRSocket.data, payload.data.readText()) + assertEquals(TestRSocket.metadata, payload.metadata?.readText()) + } } } diff --git a/rsocket-core/src/jsTest/kotlin/io/rsocket/kotlin/Test.kt b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt similarity index 75% rename from rsocket-core/src/jsTest/kotlin/io/rsocket/kotlin/Test.kt rename to rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt index 5fa2ef1ee..f4f97196e 100644 --- a/rsocket-core/src/jsTest/kotlin/io/rsocket/kotlin/Test.kt +++ b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -14,17 +14,11 @@ * limitations under the License. */ -package io.rsocket.kotlin +package io.rsocket.kotlin.test import kotlinx.coroutines.* -import kotlin.time.* -actual fun test(timeout: Duration?, block: suspend CoroutineScope.() -> Unit): dynamic = GlobalScope.promise { - when (timeout) { - null -> block() - else -> withTimeout(timeout) { block() } - } -} +internal actual fun runTest(block: suspend CoroutineScope.() -> Unit): dynamic = GlobalScope.promise(block = block) //JS is single threaded, so it have only one dispatcher backed by one threed actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.Default diff --git a/rsocket-core/src/jvmTest/kotlin/io/rsocket/kotlin/Test.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt similarity index 67% rename from rsocket-core/src/jvmTest/kotlin/io/rsocket/kotlin/Test.kt rename to rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt index 4821b80b2..fd7645189 100644 --- a/rsocket-core/src/jvmTest/kotlin/io/rsocket/kotlin/Test.kt +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -14,22 +14,18 @@ * limitations under the License. */ -package io.rsocket.kotlin +package io.rsocket.kotlin.test import kotlinx.coroutines.* import java.io.* import java.util.logging.* -import kotlin.time.* -actual fun test(timeout: Duration?, block: suspend CoroutineScope.() -> Unit): Unit = runBlocking { +internal actual fun runTest(block: suspend CoroutineScope.() -> Unit) { //init logger - LogManager - .getLogManager() - .readConfiguration(File("src/jvmTest/resources/logging.properties").inputStream()) - when (timeout) { - null -> block() - else -> withTimeout(timeout) { block() } - } + val file = File("src/jvmTest/resources/logging.properties") + if (file.exists()) LogManager.getLogManager().readConfiguration(file.inputStream()) + + runBlocking(block = block) } actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.IO diff --git a/rsocket-transport-ktor/build.gradle.kts b/rsocket-transport-ktor/build.gradle.kts index d291c380e..1d9e3a358 100644 --- a/rsocket-transport-ktor/build.gradle.kts +++ b/rsocket-transport-ktor/build.gradle.kts @@ -36,7 +36,7 @@ kotlin { } val commonTest by getting { dependencies { - implementation(project(":rsocket-transport-test")) + implementation(project(":rsocket-transport-ktor-client")) } } val jvmTest by getting { @@ -49,7 +49,6 @@ kotlin { implementation("io.ktor:ktor-server-jetty:1.4.0") implementation("io.ktor:ktor-server-tomcat:1.4.0") - implementation(project(":rsocket-transport-ktor-client")) implementation(project(":rsocket-transport-ktor-server")) } } diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt index 8b5928e70..326abeeed 100644 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/TcpTransportTest.kt @@ -20,8 +20,8 @@ import io.ktor.network.selector.* import io.ktor.network.sockets.* import io.ktor.util.* import io.rsocket.kotlin.connection.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* -import kotlin.test.* class TcpTransportTest : TransportTest() { @OptIn(InternalAPI::class) @@ -29,20 +29,22 @@ class TcpTransportTest : TransportTest() { private val builder = aSocket(selector).tcp() private val server = builder.bind() - @BeforeTest - fun setup() { + override suspend fun before() { + super.before() + GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) { - server.accept().connection.startServer(SERVER_CONFIG, ACCEPTOR).join() + server.accept().connection.startServer(SERVER_CONFIG, ACCEPTOR) } + + client = builder.connect(server.localAddress).connection.connectClient(CONNECTOR_CONFIG) } - @AfterTest - fun cleanup() { + override suspend fun after() { + super.after() + server.close() selector.close() - runBlocking { server.socketContext.join() } + server.socketContext.join() } - override suspend fun init(): RSocket = - builder.connect(server.localAddress).connection.connectClient(CONNECTOR_CONFIG) } diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt index 4aee8d85a..c59ed6ca6 100644 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/WebSocketTransportTest.kt @@ -22,8 +22,8 @@ import io.ktor.client.engine.* import io.ktor.routing.* import io.ktor.server.engine.* import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* -import kotlin.test.* import io.ktor.client.features.websocket.WebSockets as ClientWebSockets import io.ktor.websocket.WebSockets as ServerWebSockets @@ -49,18 +49,17 @@ abstract class WebSocketTransportTest( } } - @BeforeTest - fun setup() { - server.start() - } + override suspend fun before() { + super.before() - @AfterTest - fun cleanup() { - server.stop(0, 0) + trySeveralTimes { server.start() } + client = trySeveralTimes { httpClient.rSocket(port = 9000) } } - override suspend fun init(): RSocket = trySeveralTimes { - httpClient.rSocket(port = 9000) + override suspend fun after() { + super.after() + + server.stop(0, 0) } private suspend inline fun trySeveralTimes(block: () -> R): R { diff --git a/rsocket-transport-local/build.gradle.kts b/rsocket-transport-local/build.gradle.kts index 777767370..c6fd80329 100644 --- a/rsocket-transport-local/build.gradle.kts +++ b/rsocket-transport-local/build.gradle.kts @@ -32,10 +32,5 @@ kotlin { api(project(":rsocket-core")) } } - val commonTest by getting { - dependencies { - implementation(project(":rsocket-transport-test")) - } - } } } diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/connection/LocalConnection.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/connection/LocalConnection.kt index b566b2b7f..f54e2b85f 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/connection/LocalConnection.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/connection/LocalConnection.kt @@ -42,10 +42,12 @@ class LocalConnection( * Returns pair of client and server local connections */ @Suppress("FunctionName") -fun SimpleLocalConnection(): Pair { +fun SimpleLocalConnection(parentJob: Job? = null): Pair { val clientChannel = Channel(Channel.UNLIMITED) val serverChannel = Channel(Channel.UNLIMITED) - val serverConnection = LocalConnection("server", clientChannel, serverChannel) - val clientConnection = LocalConnection("client", serverChannel, clientChannel) + + val clientConnection = LocalConnection("client", serverChannel, clientChannel, parentJob) + val serverConnection = LocalConnection("server", clientChannel, serverChannel, parentJob) + return clientConnection to serverConnection } diff --git a/rsocket-transport-local/src/jvmTest/kotlin/io/rsocket/kotlin/LocalTransportTest.kt b/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/LocalTransportTest.kt similarity index 79% rename from rsocket-transport-local/src/jvmTest/kotlin/io/rsocket/kotlin/LocalTransportTest.kt rename to rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/LocalTransportTest.kt index 9e1b18ebd..493bc3cfb 100644 --- a/rsocket-transport-local/src/jvmTest/kotlin/io/rsocket/kotlin/LocalTransportTest.kt +++ b/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/LocalTransportTest.kt @@ -18,20 +18,33 @@ package io.rsocket.kotlin import io.ktor.utils.io.core.* import io.rsocket.kotlin.connection.* +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* class LocalTransportTest : TransportTest() { - override suspend fun init(): RSocket { + + private val testJob: Job = Job() + + override suspend fun before() { + super.before() + val clientChannel = Channel(Channel.UNLIMITED) val serverChannel = Channel(Channel.UNLIMITED) - val serverConnection = LocalConnection("server", clientChannel, serverChannel) - val clientConnection = LocalConnection("client", serverChannel, clientChannel) - return coroutineScope { + + val clientConnection = LocalConnection("client", serverChannel, clientChannel, testJob) + val serverConnection = LocalConnection("server", clientChannel, serverChannel, testJob) + client = coroutineScope { launch { serverConnection.startServer(SERVER_CONFIG, ACCEPTOR) } clientConnection.connectClient(CONNECTOR_CONFIG) } } + + override suspend fun after() { + + super.after() + testJob.cancelAndJoin() + } } diff --git a/rsocket-transport-test/src/jsMain/kotlin/io/rsocket/kotlin/Test.kt b/rsocket-transport-test/src/jsMain/kotlin/io/rsocket/kotlin/Test.kt deleted file mode 100644 index fee5bc194..000000000 --- a/rsocket-transport-test/src/jsMain/kotlin/io/rsocket/kotlin/Test.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin - -import kotlinx.coroutines.* -import kotlin.time.* - -@OptIn(ExperimentalTime::class) -actual fun test(timeout: Duration?, block: suspend CoroutineScope.() -> Unit): dynamic = GlobalScope.promise { - when (timeout) { - null -> block() - else -> withTimeout(timeout) { block() } - } -} diff --git a/rsocket-transport-test/src/jvmMain/kotlin/io/rsocket/kotlin/Test.kt b/rsocket-transport-test/src/jvmMain/kotlin/io/rsocket/kotlin/Test.kt deleted file mode 100644 index 6d0cd39db..000000000 --- a/rsocket-transport-test/src/jvmMain/kotlin/io/rsocket/kotlin/Test.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin - -import kotlinx.coroutines.* -import kotlin.time.* - -@OptIn(ExperimentalTime::class) -actual fun test(timeout: Duration?, block: suspend CoroutineScope.() -> Unit): Unit = runBlocking { - when (timeout) { - null -> block() - else -> withTimeout(timeout) { block() } - } -} - diff --git a/settings.gradle.kts b/settings.gradle.kts index 05898fb8b..288e3b530 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -33,7 +33,8 @@ include("benchmarks") include("playground") include("rsocket-core") -include("rsocket-transport-test") +include("rsocket-test") + include("rsocket-transport-local") include("rsocket-transport-ktor")