diff --git a/build-logic/src/main/kotlin/rsocketbuild/TestOptIn.kt b/build-logic/src/main/kotlin/rsocketbuild/TestOptIn.kt index bd4280d1..17d8b8d0 100644 --- a/build-logic/src/main/kotlin/rsocketbuild/TestOptIn.kt +++ b/build-logic/src/main/kotlin/rsocketbuild/TestOptIn.kt @@ -30,6 +30,7 @@ fun LanguageSettingsBuilder.optInForTest() { optIn("kotlinx.coroutines.DelicateCoroutinesApi") optIn("io.rsocket.kotlin.TransportApi") + optIn("io.rsocket.kotlin.transport.RSocketTransportApi") optIn("io.rsocket.kotlin.ExperimentalMetadataApi") optIn("io.rsocket.kotlin.ExperimentalStreamsApi") optIn("io.rsocket.kotlin.RSocketLoggingApi") diff --git a/rsocket-core/api/rsocket-core.api b/rsocket-core/api/rsocket-core.api index bdce403b..0434e513 100644 --- a/rsocket-core/api/rsocket-core.api +++ b/rsocket-core/api/rsocket-core.api @@ -207,6 +207,7 @@ public abstract interface class io/rsocket/kotlin/core/MimeTypeWithName : io/rso public final class io/rsocket/kotlin/core/RSocketConnector { public final fun connect (Lio/rsocket/kotlin/transport/ClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun connect (Lio/rsocket/kotlin/transport/RSocketClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class io/rsocket/kotlin/core/RSocketConnectorBuilder { @@ -241,6 +242,8 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt { public final class io/rsocket/kotlin/core/RSocketServer { public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object; public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object; + public final fun createAcceptor (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketServerAcceptor; + public final fun start (Lio/rsocket/kotlin/transport/RSocketServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class io/rsocket/kotlin/core/RSocketServerBuilder { @@ -820,6 +823,59 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt { public static final fun ClientTransport (Lkotlin/coroutines/CoroutineContext;Lio/rsocket/kotlin/transport/ClientTransport;)Lio/rsocket/kotlin/transport/ClientTransport; } +public abstract interface class io/rsocket/kotlin/transport/RSocketClientTransport : io/rsocket/kotlin/transport/RSocketTransport { + public abstract fun createSession (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketServerAcceptor { + public abstract fun acceptSession (Lio/rsocket/kotlin/transport/RSocketTransportSession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstance : kotlinx/coroutines/CoroutineScope { +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketServerTransport : io/rsocket/kotlin/transport/RSocketTransport { + public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketServerAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope { +} + +public abstract interface annotation class io/rsocket/kotlin/transport/RSocketTransportApi : java/lang/annotation/Annotation { +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketTransportBuilder { + public abstract fun buildTransport (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport; +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketTransportEngine : kotlinx/coroutines/CoroutineScope { + public abstract fun createTransport (Ljava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport; +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketTransportEngineBuilder { + public abstract fun buildEngine (Lkotlin/coroutines/CoroutineContext;)Lio/rsocket/kotlin/transport/RSocketTransportEngine; +} + +public abstract class io/rsocket/kotlin/transport/RSocketTransportEngineFactory { + public fun (Lkotlin/jvm/functions/Function0;)V + public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransportEngine; + public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportEngineFactory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransportEngine; +} + +public abstract class io/rsocket/kotlin/transport/RSocketTransportFactory { + public fun (Lkotlin/jvm/functions/Function0;)V + public final fun invoke (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransport; + public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportFactory;Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport; +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession : kotlinx/coroutines/CoroutineScope { +} + +public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession$Sequential : io/rsocket/kotlin/transport/RSocketTransportSession { + public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public abstract interface class io/rsocket/kotlin/transport/ServerTransport { public abstract fun start (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object; } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt index 8a80a9d6..08edb370 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt @@ -19,8 +19,6 @@ package io.rsocket.kotlin import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* -import io.rsocket.kotlin.frame.* -import io.rsocket.kotlin.internal.* import kotlinx.coroutines.* /** @@ -35,14 +33,3 @@ public interface Connection : CoroutineScope { public suspend fun send(packet: ByteReadPacket) public suspend fun receive(): ByteReadPacket } - -@Suppress("DEPRECATION") -@OptIn(TransportApi::class) -internal suspend inline fun Connection.receiveFrame(bufferPool: ObjectPool, block: (frame: Frame) -> T): T = - receive().readFrame(bufferPool).closeOnError(block) - -@Suppress("DEPRECATION") -@OptIn(TransportApi::class) -internal suspend fun Connection.sendFrame(bufferPool: ObjectPool, frame: Frame) { - frame.toPacket(bufferPool).closeOnError { send(it) } -} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index 952c268c..2e63ab40 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -25,8 +25,9 @@ import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* +import kotlin.coroutines.* -@OptIn(TransportApi::class, RSocketLoggingApi::class) +@OptIn(TransportApi::class, RSocketTransportApi::class, RSocketLoggingApi::class) public class RSocketConnector internal constructor( private val loggerFactory: LoggerFactory, private val maxFragmentSize: Int, @@ -37,19 +38,33 @@ public class RSocketConnector internal constructor( @Suppress("DEPRECATION") private val bufferPool: ObjectPool, ) { - public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) { - //TODO current coroutineContext job is overriden by transport coroutineContext jov - null -> withContext(transport.coroutineContext) { connectOnce(transport) } - else -> connectWithReconnect( - transport.coroutineContext, - loggerFactory.logger("io.rsocket.kotlin.connection"), - { connectOnce(transport) }, - reconnectPredicate, - ) + private val connectionLogger = loggerFactory.logger("io.rsocket.kotlin.connection") + private val frameLogger = loggerFactory.logger("io.rsocket.kotlin.frame") + + public suspend fun connect(transport: ClientTransport): RSocket = connect(object : RSocketClientTransport { + override val coroutineContext: CoroutineContext get() = transport.coroutineContext + override suspend fun createSession(): RSocketTransportSession { + return interceptors.wrapConnection(transport.connect()).convert() + } + }) + + public suspend fun connect(transport: RSocketClientTransport): RSocket { + return when (reconnectPredicate) { + //TODO current coroutineContext job is overriden by transport coroutineContext jov + null -> withContext(transport.coroutineContext) { connectOnce(transport) } + else -> connectWithReconnect( + transport.coroutineContext, + connectionLogger, + { connectOnce(transport) }, + reconnectPredicate, + ) + } } - private suspend fun connectOnce(transport: ClientTransport): RSocket { - val connection = transport.connect().wrapConnection() + private suspend fun connectOnce(transport: RSocketClientTransport): RSocket { + val connection = when (val connection = transport.createSession()) { + is RSocketTransportSession.Sequential -> connection.logging(frameLogger) + } val connectionConfig = try { connectionConfigProvider() } catch (cause: Throwable) { @@ -83,8 +98,4 @@ public class RSocketConnector internal constructor( throw cause } } - - private fun Connection.wrapConnection(): Connection = - interceptors.wrapConnection(this) - .logging(loggerFactory.logger("io.rsocket.kotlin.frame")) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt index 24de28be..7262b397 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt @@ -26,7 +26,7 @@ import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* -@OptIn(TransportApi::class, RSocketLoggingApi::class) +@OptIn(TransportApi::class, RSocketTransportApi::class, RSocketLoggingApi::class) public class RSocketServer internal constructor( private val loggerFactory: LoggerFactory, private val maxFragmentSize: Int, @@ -34,6 +34,8 @@ public class RSocketServer internal constructor( @Suppress("DEPRECATION") private val bufferPool: ObjectPool, ) { + private val frameLogger = loggerFactory.logger("io.rsocket.kotlin.frame") + @DelicateCoroutinesApi public fun bind( transport: ServerTransport, @@ -46,46 +48,59 @@ public class RSocketServer internal constructor( acceptor: ConnectionAcceptor, ): T = with(transport) { scope.start { - it.wrapConnection().bind(acceptor).join() + interceptors.wrapConnection(it).convert().bind(acceptor).join() + } + } + + public suspend fun start( + transport: RSocketServerTransport, + acceptor: ConnectionAcceptor, + ): T = transport.startServer(createAcceptor(acceptor)) + + @RSocketTransportApi + public fun createAcceptor(acceptor: ConnectionAcceptor): RSocketServerAcceptor = object : RSocketServerAcceptor { + override suspend fun acceptSession(session: RSocketTransportSession) { + when (session) { + is RSocketTransportSession.Sequential -> session.bind(acceptor) + } } } - private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame -> - when { - setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}")) - setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}")) - setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported")) - setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported")) - else -> try { - connect( - connection = this, - isServer = true, - maxFragmentSize = maxFragmentSize, - interceptors = interceptors, - connectionConfig = ConnectionConfig( - keepAlive = setupFrame.keepAlive, - payloadMimeType = setupFrame.payloadMimeType, - setupPayload = setupFrame.payload - ), - acceptor = acceptor, - bufferPool = bufferPool - ) - coroutineContext.job - } catch (e: Throwable) { - failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor")) + private suspend fun RSocketTransportSession.Sequential.bind( + acceptor: ConnectionAcceptor, + ): Job = with(logging(frameLogger)) { + receiveFrame(bufferPool) { setupFrame -> + when { + setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}")) + setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}")) + setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported")) + setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported")) + else -> try { + connect( + connection = this, + isServer = true, + maxFragmentSize = maxFragmentSize, + interceptors = interceptors, + connectionConfig = ConnectionConfig( + keepAlive = setupFrame.keepAlive, + payloadMimeType = setupFrame.payloadMimeType, + setupPayload = setupFrame.payload + ), + acceptor = acceptor, + bufferPool = bufferPool + ) + coroutineContext.job + } catch (e: Throwable) { + failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor")) + } } } } @Suppress("SuspendFunctionOnCoroutineScope") - private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing { + private suspend fun RSocketTransportSession.Sequential.failSetup(error: RSocketError.Setup): Nothing { sendFrame(bufferPool, ErrorFrame(0, error)) cancel("Connection establishment failed", error) throw error } - - private fun Connection.wrapConnection(): Connection = - interceptors.wrapConnection(this) - .logging(loggerFactory.logger("io.rsocket.kotlin.frame")) - } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/convert.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/convert.kt new file mode 100644 index 00000000..a41ebad3 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/convert.kt @@ -0,0 +1,37 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.core + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import kotlin.coroutines.* + +@TransportApi +@RSocketTransportApi +internal fun Connection.convert(): RSocketTransportSession.Sequential = object : RSocketTransportSession.Sequential { + override val coroutineContext: CoroutineContext + get() = this@convert.coroutineContext + + override suspend fun sendFrame(frame: ByteReadPacket) { + send(frame) + } + + override suspend fun receiveFrame(): ByteReadPacket { + return receive() + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index 0284e111..657dc977 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -21,11 +21,12 @@ import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* -@OptIn(TransportApi::class) +@RSocketTransportApi internal suspend inline fun connect( - connection: Connection, + connection: RSocketTransportSession.Sequential, isServer: Boolean, maxFragmentSize: Int, interceptors: Interceptors, @@ -99,6 +100,7 @@ internal suspend inline fun connect( is LeaseFrame -> frame.close().also { error("lease isn't implemented") } else -> frame.close() } + else -> streamsStorage.handleFrame(frame, responder) } } @@ -106,3 +108,18 @@ internal suspend inline fun connect( return requester } + +@RSocketTransportApi +internal suspend inline fun RSocketTransportSession.Sequential.receiveFrame( + @Suppress("DEPRECATION") bufferPool: ObjectPool, + block: (frame: Frame) -> T, +): T = + receiveFrame().readFrame(bufferPool).closeOnError(block) + +@RSocketTransportApi +internal suspend fun RSocketTransportSession.Sequential.sendFrame( + @Suppress("DEPRECATION") bufferPool: ObjectPool, + frame: Frame, +) { + frame.toPacket(bufferPool).closeOnError { sendFrame(it) } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt index ff77435f..904971dd 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,35 +14,38 @@ * limitations under the License. */ -@file:OptIn(TransportApi::class, RSocketLoggingApi::class) - package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.logging.* +import io.rsocket.kotlin.transport.* -internal fun Connection.logging(logger: Logger): Connection = +@RSocketLoggingApi +@RSocketTransportApi +internal fun RSocketTransportSession.Sequential.logging(logger: Logger): RSocketTransportSession.Sequential = if (logger.isLoggable(LoggingLevel.DEBUG)) LoggingConnection(this, logger) else this +@RSocketTransportApi +@RSocketLoggingApi private class LoggingConnection( - private val delegate: Connection, + private val delegate: RSocketTransportSession.Sequential, private val logger: Logger, -) : Connection by delegate { +) : RSocketTransportSession.Sequential by delegate { private fun ByteReadPacket.dumpFrameToString(): String { val length = remaining return copy().use { it.readFrame(pool).use { it.dump(length) } } } - override suspend fun send(packet: ByteReadPacket) { - logger.debug { "Send: ${packet.dumpFrameToString()}" } - delegate.send(packet) + override suspend fun sendFrame(frame: ByteReadPacket) { + logger.debug { "Send: ${frame.dumpFrameToString()}" } + delegate.sendFrame(frame) } - override suspend fun receive(): ByteReadPacket { - val packet = delegate.receive() + override suspend fun receiveFrame(): ByteReadPacket { + val packet = delegate.receiveFrame() logger.debug { "Receive: ${packet.dumpFrameToString()}" } return packet } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketClientTransport.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketClientTransport.kt new file mode 100644 index 00000000..794fc5d4 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketClientTransport.kt @@ -0,0 +1,23 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport + +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketClientTransport : RSocketTransport { + @RSocketTransportApi + public suspend fun createSession(): RSocketTransportSession +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketServerTransport.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketServerTransport.kt new file mode 100644 index 00000000..e753469c --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketServerTransport.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport + +import kotlinx.coroutines.* + +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketServerTransport : RSocketTransport { + @RSocketTransportApi + public suspend fun startServer(acceptor: RSocketServerAcceptor): Instance +} + +@RSocketTransportApi +public interface RSocketServerAcceptor { + public suspend fun acceptSession(session: RSocketTransportSession) +} + +// cancelling it will cancel server +// coroutineContext of transport should contain SupervisorJob +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketServerInstance : CoroutineScope diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransport.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransport.kt new file mode 100644 index 00000000..c856e476 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransport.kt @@ -0,0 +1,69 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport + +import kotlinx.coroutines.* +import kotlin.coroutines.* + +// coroutineContext of transport should contain SupervisorJob +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketTransport : CoroutineScope + +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketTransportBuilder { + @RSocketTransportApi + public fun buildTransport(context: CoroutineContext, target: Target): Transport +} + +@SubclassOptInRequired(RSocketTransportApi::class) +public abstract class RSocketTransportFactory< + Target, + Transport : RSocketTransport, + Builder : RSocketTransportBuilder, + >(private val createBuilder: () -> Builder) { + + @OptIn(RSocketTransportApi::class) + public operator fun invoke(context: CoroutineContext, target: Target, block: Builder.() -> Unit = {}): Transport { + return createBuilder().apply(block).buildTransport(context, target) + } +} + +// coroutineContext of transport engine should contain SupervisorJob +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketTransportEngine : CoroutineScope { + public fun createTransport(target: Target): Transport +} + +@SubclassOptInRequired(RSocketTransportApi::class) +public interface RSocketTransportEngineBuilder> { + @RSocketTransportApi + public fun buildEngine(context: CoroutineContext): Engine +} + +@SubclassOptInRequired(RSocketTransportApi::class) +public abstract class RSocketTransportEngineFactory< + Target, + Transport : RSocketTransport, + Engine : RSocketTransportEngine, + Builder : RSocketTransportEngineBuilder, + >(private val createBuilder: () -> Builder) { + + @OptIn(RSocketTransportApi::class) + public operator fun invoke(context: CoroutineContext, block: Builder.() -> Unit = {}): Engine { + return createBuilder().apply(block).buildEngine(context) + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransportApi.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransportApi.kt new file mode 100644 index 00000000..94feb24a --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransportApi.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport + +@RequiresOptIn( + level = RequiresOptIn.Level.ERROR, + message = """ + This is an API which is used to implement transport for RSocket, such as WS or TCP. + This API should not be used from general code + """ +) +public annotation class RSocketTransportApi diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransportSession.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransportSession.kt new file mode 100644 index 00000000..3367df85 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransportSession.kt @@ -0,0 +1,28 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport + +import io.ktor.utils.io.core.* +import kotlinx.coroutines.* + +@RSocketTransportApi +public sealed interface RSocketTransportSession : CoroutineScope { + public interface Sequential : RSocketTransportSession { + public suspend fun sendFrame(frame: ByteReadPacket) + public suspend fun receiveFrame(): ByteReadPacket + } +}