Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
}
}

override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload(
ByteArray(size / 2).also { Random.nextBytes(it) },
ByteArray(size / 2).also { Random.nextBytes(it) }
override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload.wrap(
data = ByteArray(size / 2).also { Random.nextBytes(it) },
metadata = ByteArray(size / 2).also { Random.nextBytes(it) }
)

override fun releasePayload(payload: Payload) {
Expand Down
38 changes: 20 additions & 18 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import com.jfrog.bintray.gradle.*
import com.jfrog.bintray.gradle.tasks.*
import org.gradle.api.publish.maven.internal.artifact.*
import org.jetbrains.kotlin.gradle.dsl.*
import org.jetbrains.kotlin.gradle.plugin.*
import org.jetbrains.kotlin.gradle.targets.js.*
import org.jetbrains.kotlin.gradle.targets.jvm.*
import org.jfrog.gradle.plugin.artifactory.dsl.*
Expand Down Expand Up @@ -62,8 +61,21 @@ subprojects {
is KotlinJsTarget -> {
useCommonJs()
//configure running tests for JS
nodejs { testTask { useKarma { useChromeHeadless() } } }
browser { testTask { useKarma { useChromeHeadless() } } }
nodejs {
testTask {
useMocha {
timeout = "600s"
}
}
}
browser {
testTask {
useKarma {
useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d"))
useChromeHeadless()
}
}
}
}
is KotlinJvmTarget -> {
compilations.all {
Expand All @@ -81,7 +93,7 @@ subprojects {

useExperimentalAnnotation("kotlin.RequiresOptIn")

if (name.contains("test", ignoreCase = true)) {
if (name.contains("test", ignoreCase = true) || project.name == "rsocket-test") {
useExperimentalAnnotation("kotlin.time.ExperimentalTime")
useExperimentalAnnotation("kotlin.ExperimentalStdlibApi")
useExperimentalAnnotation("kotlinx.coroutines.ExperimentalCoroutinesApi")
Expand All @@ -94,20 +106,10 @@ subprojects {
}
}


//should be not needed after ~1.4.20 when kotlin-test will be added automatically
val commonTest by sourceSets.getting {
dependencies {
implementation(kotlin("test-common"))
implementation(kotlin("test-annotations-common"))
}
}
targets.all {
compilations.findByName("test")?.dependencies {
when (platformType) {
KotlinPlatformType.jvm -> implementation(kotlin("test-junit"))
KotlinPlatformType.js -> implementation(kotlin("test-js"))
else -> Unit
if (project.name != "rsocket-test") {
val commonTest by sourceSets.getting {
dependencies {
implementation(project(":rsocket-test"))
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions js/karma.config.d/karma.conf.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
config.client = config.client || {}
config.client.mocha = config.client.mocha || {}
config.client.mocha.timeout = '600s'
config.browserNoActivityTimeout = 600000
config.browserDisconnectTimeout = 600000
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,42 @@
* limitations under the License.
*/

package tcp

import doSomething
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.*
import io.rsocket.kotlin.connection.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import java.util.concurrent.*

@OptIn(KtorExperimentalAPI::class)
fun main(): Unit = runBlocking {
val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()
val socket = aSocket(ActorSelectorManager(dispatcher)).tcp().connect("127.0.0.1", 2323)

val client = socket.connection.connectClient()
try {
client.doSomething()
} catch (e: Throwable) {
dispatcher.close()
throw e
}
import kotlin.coroutines.*

@OptIn(KtorExperimentalAPI::class, InternalAPI::class)
suspend fun runTcpClient(dispatcher: CoroutineContext): Unit {
aSocket(SelectorManager(dispatcher))
.tcp()
.connect("127.0.0.1", 2323)
.connection
.connectClient()
.doSomething()
}

@OptIn(KtorExperimentalAPI::class, InternalAPI::class)
suspend fun runTcpServer(dispatcher: CoroutineContext): Unit {
aSocket(SelectorManager(dispatcher))
.tcp()
.bind("127.0.0.1", 2323)
.rSocket(acceptor = rSocketAcceptor)
}

//to test nodejs tcp server
@OptIn(KtorExperimentalAPI::class)
fun main2(): Unit = runBlocking {
val socket = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect("127.0.0.1", 9000)

val client = socket.connection.connectClient()
@OptIn(KtorExperimentalAPI::class, InternalAPI::class)
suspend fun testNodeJsServer(dispatcher: CoroutineContext) {
val client =
aSocket(SelectorManager(dispatcher))
.tcp()
.connect("127.0.0.1", 9000)
.connection
.connectClient()

val response = client.requestResponse(Payload("Hello from JVM"))

println(response.data.readText())
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@
* limitations under the License.
*/

package ws

import doSomething
import io.ktor.client.*
import io.ktor.client.engine.*
import io.ktor.client.features.websocket.*
import io.ktor.util.*
import io.rsocket.kotlin.core.*

expect val engine: HttpClientEngineFactory<*>

@OptIn(KtorExperimentalAPI::class)
suspend fun run() {
suspend fun runWSClient(engine: HttpClientEngineFactory<*>) {
val client = HttpClient(engine) {
install(WebSockets)
install(RSocketClientSupport)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
* limitations under the License.
*/

package ws

import io.ktor.client.engine.*
import io.ktor.client.engine.js.*

actual val engine: HttpClientEngineFactory<*> = Js

suspend fun main() = run()
suspend fun main(): Unit = runWSClient(Js)
19 changes: 19 additions & 0 deletions playground/src/jvmMain/kotlin/TcpClientApp.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import kotlinx.coroutines.*

suspend fun main(): Unit = runTcpClient(Dispatchers.IO)
19 changes: 19 additions & 0 deletions playground/src/jvmMain/kotlin/TcpServerApp.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import kotlinx.coroutines.*

suspend fun main(): Unit = runTcpServer(Dispatchers.IO)
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@
* limitations under the License.
*/

package ws

import io.ktor.client.engine.*
import io.ktor.client.engine.cio.*
import io.ktor.util.*

@OptIn(KtorExperimentalAPI::class)
actual val engine: HttpClientEngineFactory<*> = CIO

suspend fun main() = run()

suspend fun main(): Unit = runWSClient(CIO)
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@
* limitations under the License.
*/

package ws

import io.ktor.application.*
import io.ktor.routing.*
import io.ktor.server.cio.*
import io.ktor.server.engine.*
import io.ktor.util.*
import io.rsocket.kotlin.core.*
import rSocketAcceptor

@OptIn(KtorExperimentalAPI::class)
fun main() {
Expand Down
33 changes: 0 additions & 33 deletions playground/src/jvmMain/kotlin/tcp/TCPServer.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ internal class RSocketResponder(
fun handlerRequestResponse(frame: RequestFrame): Unit = with(state) {
val streamId = frame.streamId
launchCancelable(streamId) {
val response = requestOrThrow(streamId) {
val response = requestOrCancel(streamId) {
requestHandler.requestResponse(frame.payload)
}
} ?: return@launchCancelable
if (isActive) send(NextCompletePayloadFrame(streamId, response))
}
}

fun handleRequestStream(initFrame: RequestFrame): Unit = with(state) {
val streamId = initFrame.streamId
launchCancelable(streamId) {
val response = requestOrThrow(streamId) {
val response = requestOrCancel(streamId) {
requestHandler.requestStream(initFrame.payload)
}
} ?: return@launchCancelable
response.collectLimiting(
streamId,
RequestStreamResponderFlowCollector(state, streamId, initFrame.initialRequest)
Expand All @@ -68,9 +68,9 @@ internal class RSocketResponder(
val request = RequestChannelResponderFlow(streamId, receiver, state)

launchCancelable(streamId) {
val response = requestOrThrow(streamId) {
val response = requestOrCancel(streamId) {
requestHandler.requestChannel(request)
}
} ?: return@launchCancelable
response.collectLimiting(
streamId,
RequestStreamResponderFlowCollector(state, streamId, initFrame.initialRequest)
Expand All @@ -80,13 +80,15 @@ internal class RSocketResponder(
}
}

private inline fun <T> CoroutineScope.requestOrThrow(streamId: Int, block: () -> T): T {
return try {
private inline fun <T : Any> CoroutineScope.requestOrCancel(streamId: Int, block: () -> T): T? =
try {
block()
} catch (e: Throwable) {
if (isActive) state.send(ErrorFrame(streamId, e))
throw e
if (isActive) {
state.send(ErrorFrame(streamId, e))
cancel("Request handling failed", e) //KLUDGE: can be related to IR, because using `throw` fails on JS IR and Native
}
null
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ internal class RSocketState(
suspend inline fun Flow<Payload>.collectLimiting(
streamId: Int,
limitingCollector: LimitingFlowCollector,
) {
): Unit = coroutineScope {
limits[streamId] = limitingCollector
try {
collect(limitingCollector)
send(CompletePayloadFrame(streamId))
} catch (e: Throwable) {
limits.remove(streamId)
//if isn't active, then, that stream was cancelled, and so no need for error frame
if (currentCoroutineContext().isActive) send(ErrorFrame(streamId, e))
throw e
if (isActive) send(ErrorFrame(streamId, e))
cancel("Collect failed", e) //KLUDGE: can be related to IR, because using `throw` fails on JS IR and Native
}
}

Expand All @@ -114,7 +114,7 @@ internal class RSocketState(
is RequestNFrame -> limits[streamId]?.updateRequests(frame.requestN)
is CancelFrame -> senders.remove(streamId)?.cancel()
is ErrorFrame -> receivers.remove(streamId)?.close(frame.throwable)
is RequestFrame -> when (frame.type) {
is RequestFrame -> when (frame.type) {
FrameType.Payload -> receivers[streamId]?.offer(frame)
FrameType.RequestFnF -> responder.handleFireAndForget(frame)
FrameType.RequestResponse -> responder.handlerRequestResponse(frame)
Expand Down
Loading